This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit fbd13e9bade399b939d2b6c6ac3f23b8b31102bf
Author: Andrea Cosentino <[email protected]>
AuthorDate: Tue Dec 16 10:33:07 2025 +0100

    CAMEL-22786 - Camel-AWS: Extract common logic for clients instantiation in 
a separated module - AWS Kinesis
    
    Signed-off-by: Andrea Cosentino <[email protected]>
---
 components/camel-aws/camel-aws-common/pom.xml      |   5 +
 .../component/aws/common/AwsClientBuilderUtil.java |  95 +++++++++++++++++
 components/camel-aws/camel-aws2-kinesis/pom.xml    |   4 +
 .../firehose/KinesisFirehose2Configuration.java    |   3 +-
 .../aws2/firehose/KinesisFirehose2Endpoint.java    |   2 +-
 .../client/KinesisFirehoseClientFactory.java       |  28 ++---
 .../client/KinesisFirehoseInternalClient.java      |  32 ------
 .../KinesisFirehoseClientIAMOptimizedImpl.java     |  95 -----------------
 ...nesisFirehoseClientIAMProfileOptimizedImpl.java | 100 ------------------
 .../KinesisFirehoseClientSessionTokenImpl.java     | 111 --------------------
 .../impl/KinesisFirehoseClientStandardImpl.java    | 109 -------------------
 .../aws2/kinesis/Kinesis2Configuration.java        |   3 +-
 .../component/aws2/kinesis/KinesisConnection.java  |   4 +-
 .../kinesis/client/KinesisAsyncInternalClient.java |  31 ------
 .../aws2/kinesis/client/KinesisClientFactory.java  |  46 ++++-----
 .../aws2/kinesis/client/KinesisInternalClient.java |  32 ------
 .../impl/KinesisAsyncClientIAMOptimizedImpl.java   |  96 -----------------
 .../KinesisAsyncClientIAMProfileOptimizedImpl.java | 101 ------------------
 .../impl/KinesisAsyncClientSessionTokenImpl.java   | 115 ---------------------
 .../impl/KinesisAsyncClientStandardImpl.java       | 113 --------------------
 .../client/impl/KinesisClientIAMOptimizedImpl.java |  94 -----------------
 .../impl/KinesisClientIAMProfileOptimizedImpl.java |  99 ------------------
 .../client/impl/KinesisClientSessionTokenImpl.java | 111 --------------------
 .../client/impl/KinesisClientStandardImpl.java     | 109 -------------------
 .../firehose/KinesisFirehoseClientFactoryTest.java |  57 +++++-----
 .../aws2/kinesis/KinesisClientFactoryTest.java     |  66 ++++++------
 26 files changed, 201 insertions(+), 1460 deletions(-)

diff --git a/components/camel-aws/camel-aws-common/pom.xml 
b/components/camel-aws/camel-aws-common/pom.xml
index 5b4c0e1d1b97..98625ae5e012 100644
--- a/components/camel-aws/camel-aws-common/pom.xml
+++ b/components/camel-aws/camel-aws-common/pom.xml
@@ -67,6 +67,11 @@
             <artifactId>aws-core</artifactId>
             <version>${aws-java-sdk2-version}</version>
         </dependency>
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>netty-nio-client</artifactId>
+            <version>${aws-java-sdk2-version}</version>
+        </dependency>
     </dependencies>
 
 </project>
diff --git 
a/components/camel-aws/camel-aws-common/src/main/java/org/apache/camel/component/aws/common/AwsClientBuilderUtil.java
 
b/components/camel-aws/camel-aws-common/src/main/java/org/apache/camel/component/aws/common/AwsClientBuilderUtil.java
index cf08b59fb76f..932aacec7d78 100644
--- 
a/components/camel-aws/camel-aws-common/src/main/java/org/apache/camel/component/aws/common/AwsClientBuilderUtil.java
+++ 
b/components/camel-aws/camel-aws-common/src/main/java/org/apache/camel/component/aws/common/AwsClientBuilderUtil.java
@@ -31,11 +31,14 @@ import 
software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
 import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
 import software.amazon.awssdk.core.SdkClient;
+import software.amazon.awssdk.core.client.builder.SdkAsyncClientBuilder;
 import software.amazon.awssdk.core.client.builder.SdkSyncClientBuilder;
 import software.amazon.awssdk.http.SdkHttpClient;
 import software.amazon.awssdk.http.SdkHttpConfigurationOption;
 import software.amazon.awssdk.http.apache.ApacheHttpClient;
 import software.amazon.awssdk.http.apache.ProxyConfiguration;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.utils.AttributeMap;
 
@@ -143,6 +146,98 @@ public final class AwsClientBuilderUtil {
         return buildClient(config, builderSupplier, null);
     }
 
+    /**
+     * Build an AWS async client with the given configuration.
+     *
+     * @param  config                The common AWS configuration
+     * @param  builderSupplier       Supplier for the service-specific async 
client builder (e.g.,
+     *                               KinesisAsyncClient::builder)
+     * @param  serviceSpecificConfig Optional consumer for service-specific 
configuration
+     * @param  <B>                   The builder type (must extend both 
AwsClientBuilder and SdkAsyncClientBuilder)
+     * @param  <C>                   The client type
+     * @return                       The configured AWS async client
+     */
+    @SuppressWarnings("unchecked")
+    public static <B extends AwsClientBuilder<B, C> & SdkAsyncClientBuilder<B, 
C>, C extends SdkClient> C buildAsyncClient(
+            AwsCommonConfiguration config,
+            Supplier<B> builderSupplier,
+            Consumer<B> serviceSpecificConfig) {
+
+        B clientBuilder = builderSupplier.get();
+        NettyNioAsyncHttpClient.Builder httpClientBuilder = null;
+        boolean httpClientConfigured = false;
+
+        // 1. Configure proxy
+        if (ObjectHelper.isNotEmpty(config.getProxyHost())
+                && ObjectHelper.isNotEmpty(config.getProxyPort())) {
+            LOG.trace("Configuring async proxy: {}:{}", config.getProxyHost(), 
config.getProxyPort());
+            software.amazon.awssdk.http.nio.netty.ProxyConfiguration 
proxyConfig
+                    = 
software.amazon.awssdk.http.nio.netty.ProxyConfiguration.builder()
+                            .scheme(config.getProxyProtocol().toString())
+                            .host(config.getProxyHost())
+                            .port(config.getProxyPort())
+                            .build();
+            httpClientBuilder = 
NettyNioAsyncHttpClient.builder().proxyConfiguration(proxyConfig);
+            httpClientConfigured = true;
+        }
+
+        // 2. Configure credentials
+        AwsCredentialsProvider credentialsProvider = 
resolveCredentialsProvider(config);
+        if (credentialsProvider != null) {
+            clientBuilder.credentialsProvider(credentialsProvider);
+        }
+
+        // 3. Apply HTTP client builder if configured (before trust all certs 
check)
+        if (httpClientConfigured) {
+            clientBuilder.httpClientBuilder(httpClientBuilder);
+        }
+
+        // 4. Configure region
+        if (ObjectHelper.isNotEmpty(config.getRegion())) {
+            clientBuilder.region(Region.of(config.getRegion()));
+        }
+
+        // 5. Configure endpoint override
+        if (config.isOverrideEndpoint() && 
ObjectHelper.isNotEmpty(config.getUriEndpointOverride())) {
+            
clientBuilder.endpointOverride(URI.create(config.getUriEndpointOverride()));
+        }
+
+        // 6. Configure trust all certificates
+        if (config.isTrustAllCertificates()) {
+            if (httpClientBuilder == null) {
+                httpClientBuilder = NettyNioAsyncHttpClient.builder();
+            }
+            SdkAsyncHttpClient asyncHttpClient = 
httpClientBuilder.buildWithDefaults(
+                    AttributeMap.builder()
+                            
.put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, Boolean.TRUE)
+                            .build());
+            clientBuilder.httpClient(asyncHttpClient);
+            clientBuilder.httpClientBuilder(null);
+        }
+
+        // 7. Apply service-specific configuration
+        if (serviceSpecificConfig != null) {
+            serviceSpecificConfig.accept(clientBuilder);
+        }
+
+        return clientBuilder.build();
+    }
+
+    /**
+     * Build an AWS async client with the given configuration, without 
service-specific configuration.
+     *
+     * @param  config          The common AWS configuration
+     * @param  builderSupplier Supplier for the service-specific async client 
builder
+     * @param  <B>             The builder type
+     * @param  <C>             The client type
+     * @return                 The configured AWS async client
+     */
+    public static <B extends AwsClientBuilder<B, C> & SdkAsyncClientBuilder<B, 
C>, C extends SdkClient> C buildAsyncClient(
+            AwsCommonConfiguration config,
+            Supplier<B> builderSupplier) {
+        return buildAsyncClient(config, builderSupplier, null);
+    }
+
     /**
      * Resolve the appropriate credentials provider based on configuration.
      * <p>
diff --git a/components/camel-aws/camel-aws2-kinesis/pom.xml 
b/components/camel-aws/camel-aws2-kinesis/pom.xml
index ac572cbe67ba..4a7ca805451b 100644
--- a/components/camel-aws/camel-aws2-kinesis/pom.xml
+++ b/components/camel-aws/camel-aws2-kinesis/pom.xml
@@ -33,6 +33,10 @@
     <description>Consuming and Producing data to AWS Kinesis 
Service</description>
 
     <dependencies>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-aws-common</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-support</artifactId>
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Configuration.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Configuration.java
index 3960113314d6..c2062cbe956c 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Configuration.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Configuration.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.aws2.firehose;
 
 import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.aws.common.AwsCommonConfiguration;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
@@ -25,7 +26,7 @@ import software.amazon.awssdk.core.Protocol;
 import software.amazon.awssdk.services.firehose.FirehoseClient;
 
 @UriParams
-public class KinesisFirehose2Configuration implements Cloneable {
+public class KinesisFirehose2Configuration implements Cloneable, 
AwsCommonConfiguration {
 
     @UriPath(description = "Name of the stream")
     @Metadata(required = true)
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Endpoint.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Endpoint.java
index d55f05501221..35e73ae69161 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Endpoint.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Endpoint.java
@@ -71,7 +71,7 @@ public class KinesisFirehose2Endpoint extends DefaultEndpoint 
implements Endpoin
         }
         kinesisFirehoseClient = configuration.getAmazonKinesisFirehoseClient() 
!= null
                 ? configuration.getAmazonKinesisFirehoseClient()
-                : 
KinesisFirehoseClientFactory.getKinesisFirehoseClient(configuration).getKinesisFirehoseClient();
+                : 
KinesisFirehoseClientFactory.getKinesisFirehoseClient(configuration);
 
     }
 
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/KinesisFirehoseClientFactory.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/KinesisFirehoseClientFactory.java
index 0992a8ce8c67..d8b815e0a55c 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/KinesisFirehoseClientFactory.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/KinesisFirehoseClientFactory.java
@@ -16,14 +16,12 @@
  */
 package org.apache.camel.component.aws2.firehose.client;
 
+import org.apache.camel.component.aws.common.AwsClientBuilderUtil;
 import org.apache.camel.component.aws2.firehose.KinesisFirehose2Configuration;
-import 
org.apache.camel.component.aws2.firehose.client.impl.KinesisFirehoseClientIAMOptimizedImpl;
-import 
org.apache.camel.component.aws2.firehose.client.impl.KinesisFirehoseClientIAMProfileOptimizedImpl;
-import 
org.apache.camel.component.aws2.firehose.client.impl.KinesisFirehoseClientSessionTokenImpl;
-import 
org.apache.camel.component.aws2.firehose.client.impl.KinesisFirehoseClientStandardImpl;
+import software.amazon.awssdk.services.firehose.FirehoseClient;
 
 /**
- * Factory class to return the correct type of AWS Kinesis client.
+ * Factory class to create AWS Kinesis Firehose clients using common 
configuration.
  */
 public final class KinesisFirehoseClientFactory {
 
@@ -31,20 +29,14 @@ public final class KinesisFirehoseClientFactory {
     }
 
     /**
-     * Return the correct aws Kinesis Firehose client (based on remote vs 
local).
+     * Create a Firehose client based on configuration.
      *
-     * @param  configuration configuration
-     * @return               FirehoseClient
+     * @param  configuration The Firehose configuration
+     * @return               Configured FirehoseClient
      */
-    public static KinesisFirehoseInternalClient 
getKinesisFirehoseClient(KinesisFirehose2Configuration configuration) {
-        if 
(Boolean.TRUE.equals(configuration.isUseDefaultCredentialsProvider())) {
-            return new KinesisFirehoseClientIAMOptimizedImpl(configuration);
-        } else if 
(Boolean.TRUE.equals(configuration.isUseProfileCredentialsProvider())) {
-            return new 
KinesisFirehoseClientIAMProfileOptimizedImpl(configuration);
-        } else if 
(Boolean.TRUE.equals(configuration.isUseSessionCredentials())) {
-            return new KinesisFirehoseClientSessionTokenImpl(configuration);
-        } else {
-            return new KinesisFirehoseClientStandardImpl(configuration);
-        }
+    public static FirehoseClient 
getKinesisFirehoseClient(KinesisFirehose2Configuration configuration) {
+        return AwsClientBuilderUtil.buildClient(
+                configuration,
+                FirehoseClient::builder);
     }
 }
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/KinesisFirehoseInternalClient.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/KinesisFirehoseInternalClient.java
deleted file mode 100644
index 13e158a30adf..000000000000
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/KinesisFirehoseInternalClient.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.aws2.firehose.client;
-
-import software.amazon.awssdk.services.firehose.FirehoseClient;
-
-/**
- * Manage the required actions of a Kinesis Firehose client for either local 
or remote.
- */
-public interface KinesisFirehoseInternalClient {
-
-    /**
-     * Returns a Kinesis Firehose client after a factory method determines 
which one to return.
-     *
-     * @return FirehoseClient client
-     */
-    FirehoseClient getKinesisFirehoseClient();
-}
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/impl/KinesisFirehoseClientIAMOptimizedImpl.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/impl/KinesisFirehoseClientIAMOptimizedImpl.java
deleted file mode 100644
index 9d7dba54eeaf..000000000000
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/impl/KinesisFirehoseClientIAMOptimizedImpl.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.aws2.firehose.client.impl;
-
-import java.net.URI;
-
-import org.apache.camel.component.aws2.firehose.KinesisFirehose2Configuration;
-import 
org.apache.camel.component.aws2.firehose.client.KinesisFirehoseInternalClient;
-import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.http.SdkHttpClient;
-import software.amazon.awssdk.http.SdkHttpConfigurationOption;
-import software.amazon.awssdk.http.apache.ApacheHttpClient;
-import software.amazon.awssdk.http.apache.ProxyConfiguration;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.firehose.FirehoseClient;
-import software.amazon.awssdk.services.firehose.FirehoseClientBuilder;
-import software.amazon.awssdk.utils.AttributeMap;
-
-/**
- * Manage an AWS Kinesis Firehose client for all users to use (enabling 
temporary creds). This implementation is for
- * remote instances to manage the credentials on their own (eliminating 
credential rotations)
- */
-public class KinesisFirehoseClientIAMOptimizedImpl implements 
KinesisFirehoseInternalClient {
-    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisFirehoseClientIAMOptimizedImpl.class);
-    private KinesisFirehose2Configuration configuration;
-
-    /**
-     * Constructor that uses the config file.
-     */
-    public KinesisFirehoseClientIAMOptimizedImpl(KinesisFirehose2Configuration 
configuration) {
-        LOG.trace(
-                "Creating an AWS Kinesis Firehose client for an ec2 instance 
with IAM temporary credentials (normal for ec2s).");
-        this.configuration = configuration;
-    }
-
-    /**
-     * Getting the Kinesis client that is used.
-     *
-     * @return Amazon Kinesis Client.
-     */
-    @Override
-    public FirehoseClient getKinesisFirehoseClient() {
-        FirehoseClient client = null;
-        FirehoseClientBuilder clientBuilder = FirehoseClient.builder();
-        ProxyConfiguration.Builder proxyConfig = null;
-        ApacheHttpClient.Builder httpClientBuilder = null;
-
-        if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && 
ObjectHelper.isNotEmpty(configuration.getProxyPort())) {
-            proxyConfig = ProxyConfiguration.builder();
-            URI proxyEndpoint = URI.create(configuration.getProxyProtocol() + 
"://" + configuration.getProxyHost() + ":"
-                                           + configuration.getProxyPort());
-            proxyConfig.endpoint(proxyEndpoint);
-            httpClientBuilder = 
ApacheHttpClient.builder().proxyConfiguration(proxyConfig.build());
-            clientBuilder = clientBuilder.httpClientBuilder(httpClientBuilder);
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getRegion())) {
-            clientBuilder = 
clientBuilder.region(Region.of(configuration.getRegion()));
-        }
-        if (configuration.isOverrideEndpoint()) {
-            
clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride()));
-        }
-        if (configuration.isTrustAllCertificates()) {
-            if (httpClientBuilder == null) {
-                httpClientBuilder = ApacheHttpClient.builder();
-            }
-            SdkHttpClient ahc = 
httpClientBuilder.buildWithDefaults(AttributeMap
-                    .builder()
-                    .put(
-                            SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
-                            Boolean.TRUE)
-                    .build());
-            // set created http client to use instead of builder
-            clientBuilder.httpClient(ahc);
-            clientBuilder.httpClientBuilder(null);
-        }
-        client = clientBuilder.build();
-        return client;
-    }
-}
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/impl/KinesisFirehoseClientIAMProfileOptimizedImpl.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/impl/KinesisFirehoseClientIAMProfileOptimizedImpl.java
deleted file mode 100644
index a721694db653..000000000000
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/impl/KinesisFirehoseClientIAMProfileOptimizedImpl.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.aws2.firehose.client.impl;
-
-import java.net.URI;
-
-import org.apache.camel.component.aws2.firehose.KinesisFirehose2Configuration;
-import 
org.apache.camel.component.aws2.firehose.client.KinesisFirehoseInternalClient;
-import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
-import software.amazon.awssdk.http.SdkHttpClient;
-import software.amazon.awssdk.http.SdkHttpConfigurationOption;
-import software.amazon.awssdk.http.apache.ApacheHttpClient;
-import software.amazon.awssdk.http.apache.ProxyConfiguration;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.firehose.FirehoseClient;
-import software.amazon.awssdk.services.firehose.FirehoseClientBuilder;
-import software.amazon.awssdk.utils.AttributeMap;
-
-/**
- * Manage an AWS Kinesis Firehose client for all users to use (enabling 
temporary creds). This implementation is for
- * remote instances to manage the credentials on their own (eliminating 
credential rotations)
- */
-public class KinesisFirehoseClientIAMProfileOptimizedImpl implements 
KinesisFirehoseInternalClient {
-    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisFirehoseClientIAMProfileOptimizedImpl.class);
-    private KinesisFirehose2Configuration configuration;
-
-    /**
-     * Constructor that uses the config file.
-     */
-    public 
KinesisFirehoseClientIAMProfileOptimizedImpl(KinesisFirehose2Configuration 
configuration) {
-        LOG.trace(
-                "Creating an AWS Kinesis Firehose client for an ec2 instance 
with IAM temporary credentials (normal for ec2s).");
-        this.configuration = configuration;
-    }
-
-    /**
-     * Getting the Kinesis client that is used.
-     *
-     * @return Amazon Kinesis Client.
-     */
-    @Override
-    public FirehoseClient getKinesisFirehoseClient() {
-        FirehoseClient client = null;
-        FirehoseClientBuilder clientBuilder = FirehoseClient.builder();
-        ProxyConfiguration.Builder proxyConfig = null;
-        ApacheHttpClient.Builder httpClientBuilder = null;
-
-        if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && 
ObjectHelper.isNotEmpty(configuration.getProxyPort())) {
-            proxyConfig = ProxyConfiguration.builder();
-            URI proxyEndpoint = URI.create(configuration.getProxyProtocol() + 
"://" + configuration.getProxyHost() + ":"
-                                           + configuration.getProxyPort());
-            proxyConfig.endpoint(proxyEndpoint);
-            httpClientBuilder = 
ApacheHttpClient.builder().proxyConfiguration(proxyConfig.build());
-            clientBuilder = clientBuilder.httpClientBuilder(httpClientBuilder);
-        }
-        if (configuration.getProfileCredentialsName() != null) {
-            clientBuilder = clientBuilder
-                    
.credentialsProvider(ProfileCredentialsProvider.create(configuration.getProfileCredentialsName()));
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getRegion())) {
-            clientBuilder = 
clientBuilder.region(Region.of(configuration.getRegion()));
-        }
-        if (configuration.isOverrideEndpoint()) {
-            
clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride()));
-        }
-        if (configuration.isTrustAllCertificates()) {
-            if (httpClientBuilder == null) {
-                httpClientBuilder = ApacheHttpClient.builder();
-            }
-            SdkHttpClient ahc = 
httpClientBuilder.buildWithDefaults(AttributeMap
-                    .builder()
-                    .put(
-                            SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
-                            Boolean.TRUE)
-                    .build());
-            // set created http client to use instead of builder
-            clientBuilder.httpClient(ahc);
-            clientBuilder.httpClientBuilder(null);
-        }
-        client = clientBuilder.build();
-        return client;
-    }
-}
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/impl/KinesisFirehoseClientSessionTokenImpl.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/impl/KinesisFirehoseClientSessionTokenImpl.java
deleted file mode 100644
index 304246620054..000000000000
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/impl/KinesisFirehoseClientSessionTokenImpl.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.aws2.firehose.client.impl;
-
-import java.net.URI;
-
-import org.apache.camel.component.aws2.firehose.KinesisFirehose2Configuration;
-import 
org.apache.camel.component.aws2.firehose.client.KinesisFirehoseInternalClient;
-import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.http.SdkHttpClient;
-import software.amazon.awssdk.http.SdkHttpConfigurationOption;
-import software.amazon.awssdk.http.apache.ApacheHttpClient;
-import software.amazon.awssdk.http.apache.ProxyConfiguration;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.firehose.FirehoseClient;
-import software.amazon.awssdk.services.firehose.FirehoseClientBuilder;
-import software.amazon.awssdk.utils.AttributeMap;
-
-/**
- * Manage an AWS Kinesis Firehose client for all users to use. This 
implementation is for local instances to use a
- * static and solid credential set.
- */
-public class KinesisFirehoseClientSessionTokenImpl implements 
KinesisFirehoseInternalClient {
-    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisFirehoseClientSessionTokenImpl.class);
-    private KinesisFirehose2Configuration configuration;
-
-    /**
-     * Constructor that uses the config file.
-     */
-    public KinesisFirehoseClientSessionTokenImpl(KinesisFirehose2Configuration 
configuration) {
-        LOG.trace("Creating an AWS Kinesis Firehose manager using static 
credentials.");
-        this.configuration = configuration;
-    }
-
-    /**
-     * Getting the Kinesis Firehose client that is used.
-     *
-     * @return Amazon Kinesis Firehose Client.
-     */
-    @Override
-    public FirehoseClient getKinesisFirehoseClient() {
-        FirehoseClient client = null;
-        FirehoseClientBuilder clientBuilder = FirehoseClient.builder();
-        ProxyConfiguration.Builder proxyConfig = null;
-        ApacheHttpClient.Builder httpClientBuilder = null;
-        boolean isClientConfigFound = false;
-        if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && 
ObjectHelper.isNotEmpty(configuration.getProxyPort())) {
-            proxyConfig = ProxyConfiguration.builder();
-            URI proxyEndpoint = URI.create(configuration.getProxyProtocol() + 
"://" + configuration.getProxyHost() + ":"
-                                           + configuration.getProxyPort());
-            proxyConfig.endpoint(proxyEndpoint);
-            httpClientBuilder = 
ApacheHttpClient.builder().proxyConfiguration(proxyConfig.build());
-            isClientConfigFound = true;
-        }
-        if (configuration.getAccessKey() != null && 
configuration.getSecretKey() != null
-                && configuration.getSessionToken() != null) {
-            AwsSessionCredentials cred = 
AwsSessionCredentials.create(configuration.getAccessKey(),
-                    configuration.getSecretKey(), 
configuration.getSessionToken());
-            if (isClientConfigFound) {
-                clientBuilder = 
clientBuilder.httpClientBuilder(httpClientBuilder)
-                        
.credentialsProvider(StaticCredentialsProvider.create(cred));
-            } else {
-                clientBuilder = 
clientBuilder.credentialsProvider(StaticCredentialsProvider.create(cred));
-            }
-        } else {
-            if (!isClientConfigFound) {
-                clientBuilder = 
clientBuilder.httpClientBuilder(httpClientBuilder);
-            }
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getRegion())) {
-            clientBuilder = 
clientBuilder.region(Region.of(configuration.getRegion()));
-        }
-        if (configuration.isOverrideEndpoint()) {
-            
clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride()));
-        }
-        if (configuration.isTrustAllCertificates()) {
-            if (httpClientBuilder == null) {
-                httpClientBuilder = ApacheHttpClient.builder();
-            }
-            SdkHttpClient ahc = 
httpClientBuilder.buildWithDefaults(AttributeMap
-                    .builder()
-                    .put(
-                            SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
-                            Boolean.TRUE)
-                    .build());
-            // set created http client to use instead of builder
-            clientBuilder.httpClient(ahc);
-            clientBuilder.httpClientBuilder(null);
-        }
-        client = clientBuilder.build();
-        return client;
-    }
-}
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/impl/KinesisFirehoseClientStandardImpl.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/impl/KinesisFirehoseClientStandardImpl.java
deleted file mode 100644
index 1761b1e4c9fb..000000000000
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/impl/KinesisFirehoseClientStandardImpl.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.aws2.firehose.client.impl;
-
-import java.net.URI;
-
-import org.apache.camel.component.aws2.firehose.KinesisFirehose2Configuration;
-import 
org.apache.camel.component.aws2.firehose.client.KinesisFirehoseInternalClient;
-import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.http.SdkHttpClient;
-import software.amazon.awssdk.http.SdkHttpConfigurationOption;
-import software.amazon.awssdk.http.apache.ApacheHttpClient;
-import software.amazon.awssdk.http.apache.ProxyConfiguration;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.firehose.FirehoseClient;
-import software.amazon.awssdk.services.firehose.FirehoseClientBuilder;
-import software.amazon.awssdk.utils.AttributeMap;
-
-/**
- * Manage an AWS Kinesis Firehose client for all users to use. This 
implementation is for local instances to use a
- * static and solid credential set.
- */
-public class KinesisFirehoseClientStandardImpl implements 
KinesisFirehoseInternalClient {
-    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisFirehoseClientStandardImpl.class);
-    private KinesisFirehose2Configuration configuration;
-
-    /**
-     * Constructor that uses the config file.
-     */
-    public KinesisFirehoseClientStandardImpl(KinesisFirehose2Configuration 
configuration) {
-        LOG.trace("Creating an AWS Kinesis Firehose manager using static 
credentials.");
-        this.configuration = configuration;
-    }
-
-    /**
-     * Getting the Kinesis Firehose client that is used.
-     *
-     * @return Amazon Kinesis Firehose Client.
-     */
-    @Override
-    public FirehoseClient getKinesisFirehoseClient() {
-        FirehoseClient client = null;
-        FirehoseClientBuilder clientBuilder = FirehoseClient.builder();
-        ProxyConfiguration.Builder proxyConfig = null;
-        ApacheHttpClient.Builder httpClientBuilder = null;
-        boolean isClientConfigFound = false;
-        if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && 
ObjectHelper.isNotEmpty(configuration.getProxyPort())) {
-            proxyConfig = ProxyConfiguration.builder();
-            URI proxyEndpoint = URI.create(configuration.getProxyProtocol() + 
"://" + configuration.getProxyHost() + ":"
-                                           + configuration.getProxyPort());
-            proxyConfig.endpoint(proxyEndpoint);
-            httpClientBuilder = 
ApacheHttpClient.builder().proxyConfiguration(proxyConfig.build());
-            isClientConfigFound = true;
-        }
-        if (configuration.getAccessKey() != null && 
configuration.getSecretKey() != null) {
-            AwsBasicCredentials cred = 
AwsBasicCredentials.create(configuration.getAccessKey(), 
configuration.getSecretKey());
-            if (isClientConfigFound) {
-                clientBuilder = 
clientBuilder.httpClientBuilder(httpClientBuilder)
-                        
.credentialsProvider(StaticCredentialsProvider.create(cred));
-            } else {
-                clientBuilder = 
clientBuilder.credentialsProvider(StaticCredentialsProvider.create(cred));
-            }
-        } else {
-            if (!isClientConfigFound) {
-                clientBuilder = 
clientBuilder.httpClientBuilder(httpClientBuilder);
-            }
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getRegion())) {
-            clientBuilder = 
clientBuilder.region(Region.of(configuration.getRegion()));
-        }
-        if (configuration.isOverrideEndpoint()) {
-            
clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride()));
-        }
-        if (configuration.isTrustAllCertificates()) {
-            if (httpClientBuilder == null) {
-                httpClientBuilder = ApacheHttpClient.builder();
-            }
-            SdkHttpClient ahc = 
httpClientBuilder.buildWithDefaults(AttributeMap
-                    .builder()
-                    .put(
-                            SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
-                            Boolean.TRUE)
-                    .build());
-            // set created http client to use instead of builder
-            clientBuilder.httpClient(ahc);
-            clientBuilder.httpClientBuilder(null);
-        }
-        client = clientBuilder.build();
-        return client;
-    }
-}
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
index 2a8273783b13..072e9c2dc04a 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.aws2.kinesis;
 
 import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.aws.common.AwsCommonConfiguration;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
@@ -29,7 +30,7 @@ import software.amazon.awssdk.services.kinesis.KinesisClient;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 @UriParams
-public class Kinesis2Configuration implements Cloneable {
+public class Kinesis2Configuration implements Cloneable, 
AwsCommonConfiguration {
 
     @UriPath(description = "Name of the stream")
     @Metadata(required = true)
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/KinesisConnection.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/KinesisConnection.java
index b265c9632cd8..b02cab0b8b7e 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/KinesisConnection.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/KinesisConnection.java
@@ -41,7 +41,7 @@ public class KinesisConnection implements Closeable {
             if (Objects.isNull(kinesisClient)) {
                 kinesisClient = 
endpoint.getConfiguration().getAmazonKinesisClient() != null
                         ? endpoint.getConfiguration().getAmazonKinesisClient()
-                        : 
KinesisClientFactory.getKinesisClient(endpoint.getConfiguration()).getKinesisClient();
+                        : 
KinesisClientFactory.getKinesisClient(endpoint.getConfiguration());
             }
             return kinesisClient;
         } finally {
@@ -55,7 +55,7 @@ public class KinesisConnection implements Closeable {
             if (Objects.isNull(kinesisAsyncClient)) {
                 kinesisAsyncClient = 
endpoint.getConfiguration().getAmazonKinesisAsyncClient() != null
                         ? 
endpoint.getConfiguration().getAmazonKinesisAsyncClient()
-                        : 
KinesisClientFactory.getKinesisAsyncClient(endpoint.getConfiguration()).getKinesisAsyncClient();
+                        : 
KinesisClientFactory.getKinesisAsyncClient(endpoint.getConfiguration());
             }
             return kinesisAsyncClient;
         } finally {
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisAsyncInternalClient.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisAsyncInternalClient.java
deleted file mode 100644
index 963fe277fb8a..000000000000
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisAsyncInternalClient.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.aws2.kinesis.client;
-
-import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
-
-/**
- * Manage the required actions of a Kinesis Async client for either local or 
remote.
- */
-public interface KinesisAsyncInternalClient {
-    /**
-     * Returns a Kinesis Async client.
-     *
-     * @return KinesisAsyncClient client
-     */
-    KinesisAsyncClient getKinesisAsyncClient();
-}
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisClientFactory.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisClientFactory.java
index e8233c67b5f7..66d65d709c83 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisClientFactory.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisClientFactory.java
@@ -16,11 +16,13 @@
  */
 package org.apache.camel.component.aws2.kinesis.client;
 
+import org.apache.camel.component.aws.common.AwsClientBuilderUtil;
 import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration;
-import org.apache.camel.component.aws2.kinesis.client.impl.*;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
 
 /**
- * Factory class to return the correct type of AWS Kinesis client.
+ * Factory class to create AWS Kinesis clients using common configuration.
  */
 public final class KinesisClientFactory {
 
@@ -28,38 +30,26 @@ public final class KinesisClientFactory {
     }
 
     /**
-     * Return the correct aws Kinesis client (based on remote vs local).
+     * Create a Kinesis sync client based on configuration.
      *
-     * @param  configuration configuration
-     * @return               KinesisClient
+     * @param  configuration The Kinesis configuration
+     * @return               Configured KinesisClient
      */
-    public static KinesisInternalClient getKinesisClient(Kinesis2Configuration 
configuration) {
-        if 
(Boolean.TRUE.equals(configuration.isUseDefaultCredentialsProvider())) {
-            return new KinesisClientIAMOptimizedImpl(configuration);
-        } else if 
(Boolean.TRUE.equals(configuration.isUseProfileCredentialsProvider())) {
-            return new KinesisClientIAMProfileOptimizedImpl(configuration);
-        } else if 
(Boolean.TRUE.equals(configuration.isUseSessionCredentials())) {
-            return new KinesisClientSessionTokenImpl(configuration);
-        } else {
-            return new KinesisClientStandardImpl(configuration);
-        }
+    public static KinesisClient getKinesisClient(Kinesis2Configuration 
configuration) {
+        return AwsClientBuilderUtil.buildClient(
+                configuration,
+                KinesisClient::builder);
     }
 
     /**
-     * Return the standard aws Kinesis Async client.
+     * Create a Kinesis async client based on configuration.
      *
-     * @param  configuration configuration
-     * @return               KinesisAsyncClient
+     * @param  configuration The Kinesis configuration
+     * @return               Configured KinesisAsyncClient
      */
-    public static KinesisAsyncInternalClient 
getKinesisAsyncClient(Kinesis2Configuration configuration) {
-        if 
(Boolean.TRUE.equals(configuration.isUseDefaultCredentialsProvider())) {
-            return new KinesisAsyncClientIAMOptimizedImpl(configuration);
-        } else if 
(Boolean.TRUE.equals(configuration.isUseProfileCredentialsProvider())) {
-            return new 
KinesisAsyncClientIAMProfileOptimizedImpl(configuration);
-        } else if 
(Boolean.TRUE.equals(configuration.isUseSessionCredentials())) {
-            return new KinesisAsyncClientSessionTokenImpl(configuration);
-        } else {
-            return new KinesisAsyncClientStandardImpl(configuration);
-        }
+    public static KinesisAsyncClient 
getKinesisAsyncClient(Kinesis2Configuration configuration) {
+        return AwsClientBuilderUtil.buildAsyncClient(
+                configuration,
+                KinesisAsyncClient::builder);
     }
 }
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisInternalClient.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisInternalClient.java
deleted file mode 100644
index e22cf6d723b6..000000000000
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisInternalClient.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.aws2.kinesis.client;
-
-import software.amazon.awssdk.services.kinesis.KinesisClient;
-
-/**
- * Manage the required actions of a Kinesis client for either local or remote.
- */
-public interface KinesisInternalClient {
-
-    /**
-     * Returns a Kinesis client after a factory method determines which one to 
return.
-     *
-     * @return KinesisClient client
-     */
-    KinesisClient getKinesisClient();
-}
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientIAMOptimizedImpl.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientIAMOptimizedImpl.java
deleted file mode 100644
index 0f68516d31d3..000000000000
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientIAMOptimizedImpl.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.aws2.kinesis.client.impl;
-
-import java.net.URI;
-
-import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration;
-import 
org.apache.camel.component.aws2.kinesis.client.KinesisAsyncInternalClient;
-import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.http.SdkHttpConfigurationOption;
-import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
-import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
-import software.amazon.awssdk.http.nio.netty.ProxyConfiguration;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
-import software.amazon.awssdk.utils.AttributeMap;
-
-/**
- * Manage an AWS Kinesis Async client for all users to use (enabling temporary 
creds). This implementation is for remote
- * instances to manage the credentials on their own (eliminating credential 
rotations)
- */
-public class KinesisAsyncClientIAMOptimizedImpl implements 
KinesisAsyncInternalClient {
-    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisAsyncClientIAMOptimizedImpl.class);
-    private Kinesis2Configuration configuration;
-
-    /**
-     * Constructor that uses the config file.
-     */
-    public KinesisAsyncClientIAMOptimizedImpl(Kinesis2Configuration 
configuration) {
-        LOG.trace("Creating an AWS Kinesis Async client for an ec2 instance 
with IAM temporary credentials (normal for ec2s).");
-        this.configuration = configuration;
-    }
-
-    /**
-     * Getting the Kinesis Async client that is used.
-     *
-     * @return Amazon Kinesis Async Client.
-     */
-    @Override
-    public KinesisAsyncClient getKinesisAsyncClient() {
-        var clientBuilder = KinesisAsyncClient.builder();
-        SdkAsyncHttpClient.Builder httpClientBuilder = null;
-
-        if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && 
ObjectHelper.isNotEmpty(configuration.getProxyPort())) {
-            var proxyConfig = ProxyConfiguration
-                    .builder()
-                    .scheme(configuration.getProxyProtocol().toString())
-                    .host(configuration.getProxyHost())
-                    .port(configuration.getProxyPort())
-                    .build();
-            httpClientBuilder = NettyNioAsyncHttpClient
-                    .builder()
-                    .proxyConfiguration(proxyConfig);
-            clientBuilder = clientBuilder.httpClientBuilder(httpClientBuilder);
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getRegion())) {
-            clientBuilder = 
clientBuilder.region(Region.of(configuration.getRegion()));
-        }
-        if (configuration.isOverrideEndpoint()) {
-            
clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride()));
-        }
-        if (configuration.isTrustAllCertificates()) {
-            if (httpClientBuilder == null) {
-                httpClientBuilder = NettyNioAsyncHttpClient.builder();
-            }
-            SdkAsyncHttpClient ahc = httpClientBuilder
-                    .buildWithDefaults(AttributeMap
-                            .builder()
-                            .put(
-                                    
SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
-                                    Boolean.TRUE)
-                            .build());
-            // set created http client to use instead of builder
-            clientBuilder.httpClient(ahc);
-            clientBuilder.httpClientBuilder(null);
-        }
-        return clientBuilder.build();
-    }
-
-}
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientIAMProfileOptimizedImpl.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientIAMProfileOptimizedImpl.java
deleted file mode 100644
index 3153f0684729..000000000000
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientIAMProfileOptimizedImpl.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.aws2.kinesis.client.impl;
-
-import java.net.URI;
-
-import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration;
-import 
org.apache.camel.component.aws2.kinesis.client.KinesisAsyncInternalClient;
-import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
-import software.amazon.awssdk.http.SdkHttpConfigurationOption;
-import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
-import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
-import software.amazon.awssdk.http.nio.netty.ProxyConfiguration;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
-import software.amazon.awssdk.utils.AttributeMap;
-
-/**
- * Manage an AWS Kinesis Async client for all users to use (enabling temporary 
creds). This implementation is for remote
- * instances to manage the credentials on their own (eliminating credential 
rotations)
- */
-public class KinesisAsyncClientIAMProfileOptimizedImpl implements 
KinesisAsyncInternalClient {
-    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisAsyncClientIAMProfileOptimizedImpl.class);
-    private Kinesis2Configuration configuration;
-
-    /**
-     * Constructor that uses the config file.
-     */
-    public KinesisAsyncClientIAMProfileOptimizedImpl(Kinesis2Configuration 
configuration) {
-        LOG.trace("Creating an AWS Kinesis Async client for an ec2 instance 
with IAM temporary credentials (normal for ec2s).");
-        this.configuration = configuration;
-    }
-
-    /**
-     * Getting the KinesisAsync client that is used.
-     *
-     * @return Amazon Kinesis Async Client.
-     */
-    @Override
-    public KinesisAsyncClient getKinesisAsyncClient() {
-        var clientBuilder = KinesisAsyncClient.builder();
-        SdkAsyncHttpClient.Builder httpClientBuilder = null;
-
-        if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && 
ObjectHelper.isNotEmpty(configuration.getProxyPort())) {
-            var proxyConfig = ProxyConfiguration
-                    .builder()
-                    .scheme(configuration.getProxyProtocol().toString())
-                    .host(configuration.getProxyHost())
-                    .port(configuration.getProxyPort())
-                    .build();
-            httpClientBuilder = NettyNioAsyncHttpClient
-                    .builder()
-                    .proxyConfiguration(proxyConfig);
-            clientBuilder = clientBuilder.httpClientBuilder(httpClientBuilder);
-        }
-        if (configuration.getProfileCredentialsName() != null) {
-            clientBuilder = clientBuilder
-                    
.credentialsProvider(ProfileCredentialsProvider.create(configuration.getProfileCredentialsName()));
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getRegion())) {
-            clientBuilder = 
clientBuilder.region(Region.of(configuration.getRegion()));
-        }
-        if (configuration.isOverrideEndpoint()) {
-            
clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride()));
-        }
-        if (configuration.isTrustAllCertificates()) {
-            if (httpClientBuilder == null) {
-                httpClientBuilder = NettyNioAsyncHttpClient.builder();
-            }
-            SdkAsyncHttpClient ahc = httpClientBuilder
-                    .buildWithDefaults(AttributeMap
-                            .builder()
-                            .put(
-                                    
SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
-                                    Boolean.TRUE)
-                            .build());
-            // set created http client to use instead of builder
-            clientBuilder.httpClient(ahc);
-            clientBuilder.httpClientBuilder(null);
-        }
-        return clientBuilder.build();
-    }
-
-}
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientSessionTokenImpl.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientSessionTokenImpl.java
deleted file mode 100644
index d6db29ad541a..000000000000
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientSessionTokenImpl.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.aws2.kinesis.client.impl;
-
-import java.net.URI;
-import java.util.Objects;
-
-import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration;
-import 
org.apache.camel.component.aws2.kinesis.client.KinesisAsyncInternalClient;
-import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.http.SdkHttpConfigurationOption;
-import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
-import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
-import software.amazon.awssdk.http.nio.netty.ProxyConfiguration;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
-import software.amazon.awssdk.utils.AttributeMap;
-
-/**
- * Manage an AWS Async Kinesis client for all users to use. This 
implementation is for local instances to use a static
- * and solid credential set.
- */
-public class KinesisAsyncClientSessionTokenImpl implements 
KinesisAsyncInternalClient {
-    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisAsyncClientSessionTokenImpl.class);
-    private Kinesis2Configuration configuration;
-
-    /**
-     * Constructor that uses the config file.
-     */
-    public KinesisAsyncClientSessionTokenImpl(Kinesis2Configuration 
configuration) {
-        LOG.trace("Creating an AWS Async Kinesis manager using static 
credentials.");
-        this.configuration = configuration;
-    }
-
-    /**
-     * Getting the Kinesis Async client that is used.
-     *
-     * @return Amazon Kinesis Async Client.
-     */
-    @Override
-    public KinesisAsyncClient getKinesisAsyncClient() {
-        var clientBuilder = KinesisAsyncClient.builder();
-        var isClientConfigFound = false;
-        SdkAsyncHttpClient.Builder httpClientBuilder = null;
-
-        if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && 
ObjectHelper.isNotEmpty(configuration.getProxyPort())) {
-            var proxyConfig = ProxyConfiguration
-                    .builder()
-                    .scheme(configuration.getProxyProtocol().toString())
-                    .host(configuration.getProxyHost())
-                    .port(configuration.getProxyPort())
-                    .build();
-            httpClientBuilder = NettyNioAsyncHttpClient
-                    .builder()
-                    .proxyConfiguration(proxyConfig);
-            isClientConfigFound = true;
-        }
-        if (Objects.nonNull(configuration.getAccessKey()) && 
Objects.nonNull(configuration.getSecretKey())
-                && Objects.nonNull(configuration.getSessionToken())) {
-            var cred = 
AwsSessionCredentials.create(configuration.getAccessKey(), 
configuration.getSecretKey(),
-                    configuration.getSessionToken());
-            if (isClientConfigFound) {
-                clientBuilder = clientBuilder
-                        .httpClientBuilder(httpClientBuilder)
-                        
.credentialsProvider(StaticCredentialsProvider.create(cred));
-            } else {
-                clientBuilder = 
clientBuilder.credentialsProvider(StaticCredentialsProvider.create(cred));
-            }
-        } else {
-            if (!isClientConfigFound) {
-                clientBuilder = clientBuilder.httpClientBuilder(null);
-            }
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getRegion())) {
-            clientBuilder = 
clientBuilder.region(Region.of(configuration.getRegion()));
-        }
-        if (configuration.isOverrideEndpoint()) {
-            
clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride()));
-        }
-        if (configuration.isTrustAllCertificates()) {
-            if (httpClientBuilder == null) {
-                httpClientBuilder = NettyNioAsyncHttpClient.builder();
-            }
-            SdkAsyncHttpClient ahc = httpClientBuilder
-                    .buildWithDefaults(AttributeMap
-                            .builder()
-                            .put(
-                                    
SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
-                                    Boolean.TRUE)
-                            .build());
-            // set created http client to use instead of builder
-            clientBuilder.httpClient(ahc);
-            clientBuilder.httpClientBuilder(null);
-        }
-        return clientBuilder.build();
-    }
-}
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientStandardImpl.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientStandardImpl.java
deleted file mode 100644
index b743e66ea6c3..000000000000
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientStandardImpl.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.aws2.kinesis.client.impl;
-
-import java.net.URI;
-import java.util.Objects;
-
-import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration;
-import 
org.apache.camel.component.aws2.kinesis.client.KinesisAsyncInternalClient;
-import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.http.SdkHttpConfigurationOption;
-import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
-import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
-import software.amazon.awssdk.http.nio.netty.ProxyConfiguration;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
-import software.amazon.awssdk.utils.AttributeMap;
-
-/**
- * Manage an AWS Async Kinesis client for all users to use. This 
implementation is for local instances to use a static
- * and solid credential set.
- */
-public class KinesisAsyncClientStandardImpl implements 
KinesisAsyncInternalClient {
-    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisAsyncClientStandardImpl.class);
-    private Kinesis2Configuration configuration;
-
-    /**
-     * Constructor that uses the config file.
-     */
-    public KinesisAsyncClientStandardImpl(Kinesis2Configuration configuration) 
{
-        LOG.trace("Creating an AWS Async Kinesis manager using static 
credentials.");
-        this.configuration = configuration;
-    }
-
-    /**
-     * Getting the Kinesis Async client that is used.
-     *
-     * @return Amazon Kinesis Async Client.
-     */
-    @Override
-    public KinesisAsyncClient getKinesisAsyncClient() {
-        var clientBuilder = KinesisAsyncClient.builder();
-        var isClientConfigFound = false;
-        SdkAsyncHttpClient.Builder httpClientBuilder = null;
-
-        if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && 
ObjectHelper.isNotEmpty(configuration.getProxyPort())) {
-            var proxyConfig = ProxyConfiguration
-                    .builder()
-                    .scheme(configuration.getProxyProtocol().toString())
-                    .host(configuration.getProxyHost())
-                    .port(configuration.getProxyPort())
-                    .build();
-            httpClientBuilder = NettyNioAsyncHttpClient
-                    .builder()
-                    .proxyConfiguration(proxyConfig);
-            isClientConfigFound = true;
-        }
-        if (Objects.nonNull(configuration.getAccessKey()) && 
Objects.nonNull(configuration.getSecretKey())) {
-            var cred = 
AwsBasicCredentials.create(configuration.getAccessKey(), 
configuration.getSecretKey());
-            if (isClientConfigFound) {
-                clientBuilder = clientBuilder
-                        .httpClientBuilder(httpClientBuilder)
-                        
.credentialsProvider(StaticCredentialsProvider.create(cred));
-            } else {
-                clientBuilder = 
clientBuilder.credentialsProvider(StaticCredentialsProvider.create(cred));
-            }
-        } else {
-            if (!isClientConfigFound) {
-                clientBuilder = clientBuilder.httpClientBuilder(null);
-            }
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getRegion())) {
-            clientBuilder = 
clientBuilder.region(Region.of(configuration.getRegion()));
-        }
-        if (configuration.isOverrideEndpoint()) {
-            
clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride()));
-        }
-        if (configuration.isTrustAllCertificates()) {
-            if (httpClientBuilder == null) {
-                httpClientBuilder = NettyNioAsyncHttpClient.builder();
-            }
-            SdkAsyncHttpClient ahc = httpClientBuilder
-                    .buildWithDefaults(AttributeMap
-                            .builder()
-                            .put(
-                                    
SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
-                                    Boolean.TRUE)
-                            .build());
-            // set created http client to use instead of builder
-            clientBuilder.httpClient(ahc);
-            clientBuilder.httpClientBuilder(null);
-        }
-        return clientBuilder.build();
-    }
-}
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientIAMOptimizedImpl.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientIAMOptimizedImpl.java
deleted file mode 100644
index f4ce342dd2d7..000000000000
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientIAMOptimizedImpl.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.aws2.kinesis.client.impl;
-
-import java.net.URI;
-
-import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration;
-import org.apache.camel.component.aws2.kinesis.client.KinesisInternalClient;
-import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.http.SdkHttpClient;
-import software.amazon.awssdk.http.SdkHttpConfigurationOption;
-import software.amazon.awssdk.http.apache.ApacheHttpClient;
-import software.amazon.awssdk.http.apache.ProxyConfiguration;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.kinesis.KinesisClient;
-import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
-import software.amazon.awssdk.utils.AttributeMap;
-
-/**
- * Manage an AWS Kinesis client for all users to use (enabling temporary 
creds). This implementation is for remote
- * instances to manage the credentials on their own (eliminating credential 
rotations)
- */
-public class KinesisClientIAMOptimizedImpl implements KinesisInternalClient {
-    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisClientIAMOptimizedImpl.class);
-    private Kinesis2Configuration configuration;
-
-    /**
-     * Constructor that uses the config file.
-     */
-    public KinesisClientIAMOptimizedImpl(Kinesis2Configuration configuration) {
-        LOG.trace("Creating an AWS Kinesis client for an ec2 instance with IAM 
temporary credentials (normal for ec2s).");
-        this.configuration = configuration;
-    }
-
-    /**
-     * Getting the Kinesis client that is used.
-     *
-     * @return Amazon Kinesis Client.
-     */
-    @Override
-    public KinesisClient getKinesisClient() {
-        KinesisClient client = null;
-        KinesisClientBuilder clientBuilder = KinesisClient.builder();
-        ProxyConfiguration.Builder proxyConfig = null;
-        ApacheHttpClient.Builder httpClientBuilder = null;
-
-        if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && 
ObjectHelper.isNotEmpty(configuration.getProxyPort())) {
-            proxyConfig = ProxyConfiguration.builder();
-            URI proxyEndpoint = URI.create(configuration.getProxyProtocol() + 
"://" + configuration.getProxyHost() + ":"
-                                           + configuration.getProxyPort());
-            proxyConfig.endpoint(proxyEndpoint);
-            httpClientBuilder = 
ApacheHttpClient.builder().proxyConfiguration(proxyConfig.build());
-            clientBuilder = clientBuilder.httpClientBuilder(httpClientBuilder);
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getRegion())) {
-            clientBuilder = 
clientBuilder.region(Region.of(configuration.getRegion()));
-        }
-        if (configuration.isOverrideEndpoint()) {
-            
clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride()));
-        }
-        if (configuration.isTrustAllCertificates()) {
-            if (httpClientBuilder == null) {
-                httpClientBuilder = ApacheHttpClient.builder();
-            }
-            SdkHttpClient ahc = 
httpClientBuilder.buildWithDefaults(AttributeMap
-                    .builder()
-                    .put(
-                            SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
-                            Boolean.TRUE)
-                    .build());
-            // set created http client to use instead of builder
-            clientBuilder.httpClient(ahc);
-            clientBuilder.httpClientBuilder(null);
-        }
-        client = clientBuilder.build();
-        return client;
-    }
-}
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientIAMProfileOptimizedImpl.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientIAMProfileOptimizedImpl.java
deleted file mode 100644
index a5c9cd6689b1..000000000000
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientIAMProfileOptimizedImpl.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.aws2.kinesis.client.impl;
-
-import java.net.URI;
-
-import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration;
-import org.apache.camel.component.aws2.kinesis.client.KinesisInternalClient;
-import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
-import software.amazon.awssdk.http.SdkHttpClient;
-import software.amazon.awssdk.http.SdkHttpConfigurationOption;
-import software.amazon.awssdk.http.apache.ApacheHttpClient;
-import software.amazon.awssdk.http.apache.ProxyConfiguration;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.kinesis.KinesisClient;
-import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
-import software.amazon.awssdk.utils.AttributeMap;
-
-/**
- * Manage an AWS Kinesis client for all users to use (enabling temporary 
creds). This implementation is for remote
- * instances to manage the credentials on their own (eliminating credential 
rotations)
- */
-public class KinesisClientIAMProfileOptimizedImpl implements 
KinesisInternalClient {
-    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisClientIAMProfileOptimizedImpl.class);
-    private Kinesis2Configuration configuration;
-
-    /**
-     * Constructor that uses the config file.
-     */
-    public KinesisClientIAMProfileOptimizedImpl(Kinesis2Configuration 
configuration) {
-        LOG.trace("Creating an AWS Kinesis client for an ec2 instance with IAM 
temporary credentials (normal for ec2s).");
-        this.configuration = configuration;
-    }
-
-    /**
-     * Getting the Kinesis client that is used.
-     *
-     * @return Amazon Kinesis Client.
-     */
-    @Override
-    public KinesisClient getKinesisClient() {
-        KinesisClient client = null;
-        KinesisClientBuilder clientBuilder = KinesisClient.builder();
-        ProxyConfiguration.Builder proxyConfig = null;
-        ApacheHttpClient.Builder httpClientBuilder = null;
-
-        if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && 
ObjectHelper.isNotEmpty(configuration.getProxyPort())) {
-            proxyConfig = ProxyConfiguration.builder();
-            URI proxyEndpoint = URI.create(configuration.getProxyProtocol() + 
"://" + configuration.getProxyHost() + ":"
-                                           + configuration.getProxyPort());
-            proxyConfig.endpoint(proxyEndpoint);
-            httpClientBuilder = 
ApacheHttpClient.builder().proxyConfiguration(proxyConfig.build());
-            clientBuilder = clientBuilder.httpClientBuilder(httpClientBuilder);
-        }
-        if (configuration.getProfileCredentialsName() != null) {
-            clientBuilder = clientBuilder
-                    
.credentialsProvider(ProfileCredentialsProvider.create(configuration.getProfileCredentialsName()));
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getRegion())) {
-            clientBuilder = 
clientBuilder.region(Region.of(configuration.getRegion()));
-        }
-        if (configuration.isOverrideEndpoint()) {
-            
clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride()));
-        }
-        if (configuration.isTrustAllCertificates()) {
-            if (httpClientBuilder == null) {
-                httpClientBuilder = ApacheHttpClient.builder();
-            }
-            SdkHttpClient ahc = 
httpClientBuilder.buildWithDefaults(AttributeMap
-                    .builder()
-                    .put(
-                            SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
-                            Boolean.TRUE)
-                    .build());
-            // set created http client to use instead of builder
-            clientBuilder.httpClient(ahc);
-            clientBuilder.httpClientBuilder(null);
-        }
-        client = clientBuilder.build();
-        return client;
-    }
-}
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientSessionTokenImpl.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientSessionTokenImpl.java
deleted file mode 100644
index 41eeffb9b576..000000000000
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientSessionTokenImpl.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.aws2.kinesis.client.impl;
-
-import java.net.URI;
-
-import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration;
-import org.apache.camel.component.aws2.kinesis.client.KinesisInternalClient;
-import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.http.SdkHttpClient;
-import software.amazon.awssdk.http.SdkHttpConfigurationOption;
-import software.amazon.awssdk.http.apache.ApacheHttpClient;
-import software.amazon.awssdk.http.apache.ProxyConfiguration;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.kinesis.KinesisClient;
-import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
-import software.amazon.awssdk.utils.AttributeMap;
-
-/**
- * Manage an AWS Kinesis client for all users to use. This implementation is 
for local instances to use a static and
- * solid credential set.
- */
-public class KinesisClientSessionTokenImpl implements KinesisInternalClient {
-    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisClientSessionTokenImpl.class);
-    private Kinesis2Configuration configuration;
-
-    /**
-     * Constructor that uses the config file.
-     */
-    public KinesisClientSessionTokenImpl(Kinesis2Configuration configuration) {
-        LOG.trace("Creating an AWS Kinesis manager using static credentials.");
-        this.configuration = configuration;
-    }
-
-    /**
-     * Getting the Kinesis client that is used.
-     *
-     * @return Amazon Kinesis Client.
-     */
-    @Override
-    public KinesisClient getKinesisClient() {
-        KinesisClient client = null;
-        KinesisClientBuilder clientBuilder = KinesisClient.builder();
-        ProxyConfiguration.Builder proxyConfig = null;
-        ApacheHttpClient.Builder httpClientBuilder = null;
-        boolean isClientConfigFound = false;
-        if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && 
ObjectHelper.isNotEmpty(configuration.getProxyPort())) {
-            proxyConfig = ProxyConfiguration.builder();
-            URI proxyEndpoint = URI.create(configuration.getProxyProtocol() + 
"://" + configuration.getProxyHost() + ":"
-                                           + configuration.getProxyPort());
-            proxyConfig.endpoint(proxyEndpoint);
-            httpClientBuilder = 
ApacheHttpClient.builder().proxyConfiguration(proxyConfig.build());
-            isClientConfigFound = true;
-        }
-        if (configuration.getAccessKey() != null && 
configuration.getSecretKey() != null
-                && configuration.getSessionToken() != null) {
-            AwsSessionCredentials cred = 
AwsSessionCredentials.create(configuration.getAccessKey(),
-                    configuration.getSecretKey(), 
configuration.getSessionToken());
-            if (isClientConfigFound) {
-                clientBuilder = 
clientBuilder.httpClientBuilder(httpClientBuilder)
-                        
.credentialsProvider(StaticCredentialsProvider.create(cred));
-            } else {
-                clientBuilder = 
clientBuilder.credentialsProvider(StaticCredentialsProvider.create(cred));
-            }
-        } else {
-            if (!isClientConfigFound) {
-                clientBuilder = 
clientBuilder.httpClientBuilder(httpClientBuilder);
-            }
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getRegion())) {
-            clientBuilder = 
clientBuilder.region(Region.of(configuration.getRegion()));
-        }
-        if (configuration.isOverrideEndpoint()) {
-            
clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride()));
-        }
-        if (configuration.isTrustAllCertificates()) {
-            if (httpClientBuilder == null) {
-                httpClientBuilder = ApacheHttpClient.builder();
-            }
-            SdkHttpClient ahc = 
httpClientBuilder.buildWithDefaults(AttributeMap
-                    .builder()
-                    .put(
-                            SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
-                            Boolean.TRUE)
-                    .build());
-            // set created http client to use instead of builder
-            clientBuilder.httpClient(ahc);
-            clientBuilder.httpClientBuilder(null);
-        }
-        client = clientBuilder.build();
-        return client;
-    }
-}
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientStandardImpl.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientStandardImpl.java
deleted file mode 100644
index 5858f852233e..000000000000
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientStandardImpl.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.aws2.kinesis.client.impl;
-
-import java.net.URI;
-
-import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration;
-import org.apache.camel.component.aws2.kinesis.client.KinesisInternalClient;
-import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.http.SdkHttpClient;
-import software.amazon.awssdk.http.SdkHttpConfigurationOption;
-import software.amazon.awssdk.http.apache.ApacheHttpClient;
-import software.amazon.awssdk.http.apache.ProxyConfiguration;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.kinesis.KinesisClient;
-import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
-import software.amazon.awssdk.utils.AttributeMap;
-
-/**
- * Manage an AWS Kinesis client for all users to use. This implementation is 
for local instances to use a static and
- * solid credential set.
- */
-public class KinesisClientStandardImpl implements KinesisInternalClient {
-    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisClientStandardImpl.class);
-    private Kinesis2Configuration configuration;
-
-    /**
-     * Constructor that uses the config file.
-     */
-    public KinesisClientStandardImpl(Kinesis2Configuration configuration) {
-        LOG.trace("Creating an AWS Kinesis manager using static credentials.");
-        this.configuration = configuration;
-    }
-
-    /**
-     * Getting the Kinesis client that is used.
-     *
-     * @return Amazon Kinesis Client.
-     */
-    @Override
-    public KinesisClient getKinesisClient() {
-        KinesisClient client = null;
-        KinesisClientBuilder clientBuilder = KinesisClient.builder();
-        ProxyConfiguration.Builder proxyConfig = null;
-        ApacheHttpClient.Builder httpClientBuilder = null;
-        boolean isClientConfigFound = false;
-        if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && 
ObjectHelper.isNotEmpty(configuration.getProxyPort())) {
-            proxyConfig = ProxyConfiguration.builder();
-            URI proxyEndpoint = URI.create(configuration.getProxyProtocol() + 
"://" + configuration.getProxyHost() + ":"
-                                           + configuration.getProxyPort());
-            proxyConfig.endpoint(proxyEndpoint);
-            httpClientBuilder = 
ApacheHttpClient.builder().proxyConfiguration(proxyConfig.build());
-            isClientConfigFound = true;
-        }
-        if (configuration.getAccessKey() != null && 
configuration.getSecretKey() != null) {
-            AwsBasicCredentials cred = 
AwsBasicCredentials.create(configuration.getAccessKey(), 
configuration.getSecretKey());
-            if (isClientConfigFound) {
-                clientBuilder = 
clientBuilder.httpClientBuilder(httpClientBuilder)
-                        
.credentialsProvider(StaticCredentialsProvider.create(cred));
-            } else {
-                clientBuilder = 
clientBuilder.credentialsProvider(StaticCredentialsProvider.create(cred));
-            }
-        } else {
-            if (!isClientConfigFound) {
-                clientBuilder = 
clientBuilder.httpClientBuilder(httpClientBuilder);
-            }
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getRegion())) {
-            clientBuilder = 
clientBuilder.region(Region.of(configuration.getRegion()));
-        }
-        if (configuration.isOverrideEndpoint()) {
-            
clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride()));
-        }
-        if (configuration.isTrustAllCertificates()) {
-            if (httpClientBuilder == null) {
-                httpClientBuilder = ApacheHttpClient.builder();
-            }
-            SdkHttpClient ahc = 
httpClientBuilder.buildWithDefaults(AttributeMap
-                    .builder()
-                    .put(
-                            SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
-                            Boolean.TRUE)
-                    .build());
-            // set created http client to use instead of builder
-            clientBuilder.httpClient(ahc);
-            clientBuilder.httpClientBuilder(null);
-        }
-        client = clientBuilder.build();
-        return client;
-    }
-}
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/KinesisFirehoseClientFactoryTest.java
 
b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/KinesisFirehoseClientFactoryTest.java
index 75872d01c32c..97476556500f 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/KinesisFirehoseClientFactoryTest.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/KinesisFirehoseClientFactoryTest.java
@@ -17,48 +17,43 @@
 package org.apache.camel.component.aws2.firehose;
 
 import 
org.apache.camel.component.aws2.firehose.client.KinesisFirehoseClientFactory;
-import 
org.apache.camel.component.aws2.firehose.client.KinesisFirehoseInternalClient;
-import 
org.apache.camel.component.aws2.firehose.client.impl.KinesisFirehoseClientIAMOptimizedImpl;
-import 
org.apache.camel.component.aws2.firehose.client.impl.KinesisFirehoseClientSessionTokenImpl;
-import 
org.apache.camel.component.aws2.firehose.client.impl.KinesisFirehoseClientStandardImpl;
 import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.firehose.FirehoseClient;
 
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 public class KinesisFirehoseClientFactoryTest {
 
     @Test
-    public void getStandardFirehoseClientDefault() {
-        KinesisFirehose2Configuration kinesis2Configuration = new 
KinesisFirehose2Configuration();
-        KinesisFirehoseInternalClient kinesisFirehoseClient
-                = 
KinesisFirehoseClientFactory.getKinesisFirehoseClient(kinesis2Configuration);
-        assertTrue(kinesisFirehoseClient instanceof 
KinesisFirehoseClientStandardImpl);
+    public void getFirehoseClientWithDefaultCredentials() {
+        KinesisFirehose2Configuration configuration = new 
KinesisFirehose2Configuration();
+        configuration.setUseDefaultCredentialsProvider(true);
+        configuration.setRegion("eu-west-1");
+        FirehoseClient firehoseClient = 
KinesisFirehoseClientFactory.getKinesisFirehoseClient(configuration);
+        assertNotNull(firehoseClient);
+        firehoseClient.close();
     }
 
     @Test
-    public void getStandardFirehoseClient() {
-        KinesisFirehose2Configuration kinesis2Configuration = new 
KinesisFirehose2Configuration();
-        kinesis2Configuration.setUseDefaultCredentialsProvider(false);
-        KinesisFirehoseInternalClient kinesisFirehoseClient
-                = 
KinesisFirehoseClientFactory.getKinesisFirehoseClient(kinesis2Configuration);
-        assertTrue(kinesisFirehoseClient instanceof 
KinesisFirehoseClientStandardImpl);
+    public void getFirehoseClientWithStaticCredentials() {
+        KinesisFirehose2Configuration configuration = new 
KinesisFirehose2Configuration();
+        configuration.setAccessKey("testAccessKey");
+        configuration.setSecretKey("testSecretKey");
+        configuration.setRegion("eu-west-1");
+        FirehoseClient firehoseClient = 
KinesisFirehoseClientFactory.getKinesisFirehoseClient(configuration);
+        assertNotNull(firehoseClient);
+        firehoseClient.close();
     }
 
     @Test
-    public void getIAMOptimizedFirehoseClient() {
-        KinesisFirehose2Configuration kinesis2Configuration = new 
KinesisFirehose2Configuration();
-        kinesis2Configuration.setUseDefaultCredentialsProvider(true);
-        KinesisFirehoseInternalClient kinesisFirehoseClient
-                = 
KinesisFirehoseClientFactory.getKinesisFirehoseClient(kinesis2Configuration);
-        assertTrue(kinesisFirehoseClient instanceof 
KinesisFirehoseClientIAMOptimizedImpl);
-    }
-
-    @Test
-    public void getSessionTokenFirehoseClient() {
-        KinesisFirehose2Configuration kinesis2Configuration = new 
KinesisFirehose2Configuration();
-        kinesis2Configuration.setUseSessionCredentials(true);
-        KinesisFirehoseInternalClient kinesisFirehoseClient
-                = 
KinesisFirehoseClientFactory.getKinesisFirehoseClient(kinesis2Configuration);
-        assertTrue(kinesisFirehoseClient instanceof 
KinesisFirehoseClientSessionTokenImpl);
+    public void getFirehoseClientWithEndpointOverride() {
+        KinesisFirehose2Configuration configuration = new 
KinesisFirehose2Configuration();
+        configuration.setUseDefaultCredentialsProvider(true);
+        configuration.setRegion("eu-west-1");
+        configuration.setOverrideEndpoint(true);
+        configuration.setUriEndpointOverride("http://localhost:4566";);
+        FirehoseClient firehoseClient = 
KinesisFirehoseClientFactory.getKinesisFirehoseClient(configuration);
+        assertNotNull(firehoseClient);
+        firehoseClient.close();
     }
 }
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisClientFactoryTest.java
 
b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisClientFactoryTest.java
index f9ad5b8e5a7f..d77fa7911985 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisClientFactoryTest.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisClientFactoryTest.java
@@ -16,60 +16,66 @@
  */
 package org.apache.camel.component.aws2.kinesis;
 
-import 
org.apache.camel.component.aws2.kinesis.client.KinesisAsyncInternalClient;
 import org.apache.camel.component.aws2.kinesis.client.KinesisClientFactory;
-import org.apache.camel.component.aws2.kinesis.client.KinesisInternalClient;
-import org.apache.camel.component.aws2.kinesis.client.impl.*;
 import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
 
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 class KinesisClientFactoryTest {
 
     @Test
-    void getStandardKinesisClientDefault() {
+    void getKinesisClientWithDefaultCredentials() {
         Kinesis2Configuration kinesis2Configuration = new 
Kinesis2Configuration();
-        KinesisInternalClient kinesisClient = 
KinesisClientFactory.getKinesisClient(kinesis2Configuration);
-        assertTrue(kinesisClient instanceof KinesisClientStandardImpl);
+        kinesis2Configuration.setUseDefaultCredentialsProvider(true);
+        kinesis2Configuration.setRegion("eu-west-1");
+        KinesisClient kinesisClient = 
KinesisClientFactory.getKinesisClient(kinesis2Configuration);
+        assertNotNull(kinesisClient);
+        kinesisClient.close();
     }
 
     @Test
-    void getStandardKinesisClient() {
+    void getKinesisClientWithStaticCredentials() {
         Kinesis2Configuration kinesis2Configuration = new 
Kinesis2Configuration();
-        kinesis2Configuration.setUseDefaultCredentialsProvider(false);
-        KinesisInternalClient kinesisClient = 
KinesisClientFactory.getKinesisClient(kinesis2Configuration);
-        assertTrue(kinesisClient instanceof KinesisClientStandardImpl);
+        kinesis2Configuration.setAccessKey("testAccessKey");
+        kinesis2Configuration.setSecretKey("testSecretKey");
+        kinesis2Configuration.setRegion("eu-west-1");
+        KinesisClient kinesisClient = 
KinesisClientFactory.getKinesisClient(kinesis2Configuration);
+        assertNotNull(kinesisClient);
+        kinesisClient.close();
     }
 
     @Test
-    void getIAMOptimizedKinesisClient() {
+    void getKinesisAsyncClientWithDefaultCredentials() {
         Kinesis2Configuration kinesis2Configuration = new 
Kinesis2Configuration();
         kinesis2Configuration.setUseDefaultCredentialsProvider(true);
-        KinesisInternalClient kinesisClient = 
KinesisClientFactory.getKinesisClient(kinesis2Configuration);
-        assertTrue(kinesisClient instanceof KinesisClientIAMOptimizedImpl);
-    }
-
-    @Test
-    void getSessionTokenKinesisClient() {
-        Kinesis2Configuration kinesis2Configuration = new 
Kinesis2Configuration();
-        kinesis2Configuration.setUseSessionCredentials(true);
-        KinesisInternalClient kinesisClient = 
KinesisClientFactory.getKinesisClient(kinesis2Configuration);
-        assertTrue(kinesisClient instanceof KinesisClientSessionTokenImpl);
+        kinesis2Configuration.setRegion("eu-west-1");
+        KinesisAsyncClient kinesisAsyncClient = 
KinesisClientFactory.getKinesisAsyncClient(kinesis2Configuration);
+        assertNotNull(kinesisAsyncClient);
+        kinesisAsyncClient.close();
     }
 
     @Test
-    void getSessionTokenAsyncKinesisClient() {
+    void getKinesisAsyncClientWithStaticCredentials() {
         Kinesis2Configuration kinesis2Configuration = new 
Kinesis2Configuration();
-        kinesis2Configuration.setUseSessionCredentials(true);
-        KinesisAsyncInternalClient kinesisClient = 
KinesisClientFactory.getKinesisAsyncClient(kinesis2Configuration);
-        assertTrue(kinesisClient instanceof 
KinesisAsyncClientSessionTokenImpl);
+        kinesis2Configuration.setAccessKey("testAccessKey");
+        kinesis2Configuration.setSecretKey("testSecretKey");
+        kinesis2Configuration.setRegion("eu-west-1");
+        KinesisAsyncClient kinesisAsyncClient = 
KinesisClientFactory.getKinesisAsyncClient(kinesis2Configuration);
+        assertNotNull(kinesisAsyncClient);
+        kinesisAsyncClient.close();
     }
 
     @Test
-    void getStandardKinesisAsyncClient() {
+    void getKinesisClientWithEndpointOverride() {
         Kinesis2Configuration kinesis2Configuration = new 
Kinesis2Configuration();
-        kinesis2Configuration.setAsyncClient(true);
-        KinesisAsyncInternalClient kinesisClient = 
KinesisClientFactory.getKinesisAsyncClient(kinesis2Configuration);
-        assertTrue(kinesisClient instanceof KinesisAsyncClientStandardImpl);
+        kinesis2Configuration.setUseDefaultCredentialsProvider(true);
+        kinesis2Configuration.setRegion("eu-west-1");
+        kinesis2Configuration.setOverrideEndpoint(true);
+        kinesis2Configuration.setUriEndpointOverride("http://localhost:4566";);
+        KinesisClient kinesisClient = 
KinesisClientFactory.getKinesisClient(kinesis2Configuration);
+        assertNotNull(kinesisClient);
+        kinesisClient.close();
     }
 }

Reply via email to