This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 7d6ddffbe425421b7a43ec3150de162e349a836d
Author: Andrea Cosentino <[email protected]>
AuthorDate: Tue Dec 16 15:29:30 2025 +0100

    CAMEL-22786 - Camel-AWS: Extract common logic for clients instantiation in 
a separated module - AWS MSK
    
    Signed-off-by: Andrea Cosentino <[email protected]>
---
 components/camel-aws/camel-aws2-msk/pom.xml        |   4 +
 .../component/aws2/msk/MSK2Configuration.java      |   9 +-
 .../camel/component/aws2/msk/MSK2Endpoint.java     |   2 +-
 .../aws2/msk/client/MSK2ClientFactory.java         |  28 ++----
 .../aws2/msk/client/MSK2InternalClient.java        |  32 ------
 .../msk/client/impl/MSK2ClientOptimizedImpl.java   |  93 -----------------
 .../impl/MSK2ClientProfileOptimizedImpl.java       |  98 ------------------
 .../client/impl/MSK2ClientSessionTokenImpl.java    | 111 ---------------------
 .../msk/client/impl/MSK2ClientStandardImpl.java    | 109 --------------------
 .../component/aws2/msk/MSK2ClientFactoryTest.java  |  53 +++++-----
 10 files changed, 47 insertions(+), 492 deletions(-)

diff --git a/components/camel-aws/camel-aws2-msk/pom.xml 
b/components/camel-aws/camel-aws2-msk/pom.xml
index 3ba40fbb6af4..e97b72574740 100644
--- a/components/camel-aws/camel-aws2-msk/pom.xml
+++ b/components/camel-aws/camel-aws2-msk/pom.xml
@@ -40,6 +40,10 @@
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-support</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-aws-common</artifactId>
+        </dependency>
         <dependency>
             <groupId>software.amazon.awssdk</groupId>
             <artifactId>kafka</artifactId>
diff --git 
a/components/camel-aws/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/MSK2Configuration.java
 
b/components/camel-aws/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/MSK2Configuration.java
index ab383fc2afcd..4c77bcfdd522 100644
--- 
a/components/camel-aws/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/MSK2Configuration.java
+++ 
b/components/camel-aws/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/MSK2Configuration.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.aws2.msk;
 
 import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.aws.common.AwsCommonConfiguration;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
@@ -25,7 +26,7 @@ import software.amazon.awssdk.core.Protocol;
 import software.amazon.awssdk.services.kafka.KafkaClient;
 
 @UriParams
-public class MSK2Configuration implements Cloneable {
+public class MSK2Configuration implements Cloneable, AwsCommonConfiguration {
 
     @UriPath(description = "Logical name")
     @Metadata(required = true)
@@ -217,14 +218,16 @@ public class MSK2Configuration implements Cloneable {
      * Set whether the Kafka client should expect to load credentials through 
a default credentials provider or to
      * expect static credentials to be passed in.
      */
-    public void setUseDefaultCredentialsProvider(Boolean 
useDefaultCredentialsProvider) {
+    public void setUseDefaultCredentialsProvider(boolean 
useDefaultCredentialsProvider) {
         this.useDefaultCredentialsProvider = useDefaultCredentialsProvider;
     }
 
-    public Boolean isUseDefaultCredentialsProvider() {
+    @Override
+    public boolean isUseDefaultCredentialsProvider() {
         return useDefaultCredentialsProvider;
     }
 
+    @Override
     public boolean isUseProfileCredentialsProvider() {
         return useProfileCredentialsProvider;
     }
diff --git 
a/components/camel-aws/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/MSK2Endpoint.java
 
b/components/camel-aws/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/MSK2Endpoint.java
index 0b739c572c29..d4c34d4c2eba 100644
--- 
a/components/camel-aws/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/MSK2Endpoint.java
+++ 
b/components/camel-aws/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/MSK2Endpoint.java
@@ -68,7 +68,7 @@ public class MSK2Endpoint extends ScheduledPollEndpoint 
implements EndpointServi
 
         mskClient = configuration.getMskClient() != null
                 ? configuration.getMskClient()
-                : 
MSK2ClientFactory.getKafkaClient(configuration).getKafkaClient();
+                : MSK2ClientFactory.getKafkaClient(configuration);
     }
 
     @Override
diff --git 
a/components/camel-aws/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/client/MSK2ClientFactory.java
 
b/components/camel-aws/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/client/MSK2ClientFactory.java
index a344541f508a..3f413ecaff06 100644
--- 
a/components/camel-aws/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/client/MSK2ClientFactory.java
+++ 
b/components/camel-aws/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/client/MSK2ClientFactory.java
@@ -16,14 +16,12 @@
  */
 package org.apache.camel.component.aws2.msk.client;
 
+import org.apache.camel.component.aws.common.AwsClientBuilderUtil;
 import org.apache.camel.component.aws2.msk.MSK2Configuration;
-import org.apache.camel.component.aws2.msk.client.impl.MSK2ClientOptimizedImpl;
-import 
org.apache.camel.component.aws2.msk.client.impl.MSK2ClientProfileOptimizedImpl;
-import 
org.apache.camel.component.aws2.msk.client.impl.MSK2ClientSessionTokenImpl;
-import org.apache.camel.component.aws2.msk.client.impl.MSK2ClientStandardImpl;
+import software.amazon.awssdk.services.kafka.KafkaClient;
 
 /**
- * Factory class to return the correct type of AWS Kafka client.
+ * Factory class to create AWS MSK (Kafka) clients using common configuration.
  */
 public final class MSK2ClientFactory {
 
@@ -31,20 +29,14 @@ public final class MSK2ClientFactory {
     }
 
     /**
-     * Return the correct AWS Kafka client (based on remote vs local).
+     * Create an MSK (Kafka) client based on configuration.
      *
-     * @param  configuration configuration
-     * @return               MqClient
+     * @param  configuration The MSK configuration
+     * @return               Configured KafkaClient
      */
-    public static MSK2InternalClient getKafkaClient(MSK2Configuration 
configuration) {
-        if 
(Boolean.TRUE.equals(configuration.isUseDefaultCredentialsProvider())) {
-            return new MSK2ClientOptimizedImpl(configuration);
-        } else if 
(Boolean.TRUE.equals(configuration.isUseProfileCredentialsProvider())) {
-            return new MSK2ClientProfileOptimizedImpl(configuration);
-        } else if 
(Boolean.TRUE.equals(configuration.isUseSessionCredentials())) {
-            return new MSK2ClientSessionTokenImpl(configuration);
-        } else {
-            return new MSK2ClientStandardImpl(configuration);
-        }
+    public static KafkaClient getKafkaClient(MSK2Configuration configuration) {
+        return AwsClientBuilderUtil.buildClient(
+                configuration,
+                KafkaClient::builder);
     }
 }
diff --git 
a/components/camel-aws/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/client/MSK2InternalClient.java
 
b/components/camel-aws/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/client/MSK2InternalClient.java
deleted file mode 100644
index 9d43ecbd4482..000000000000
--- 
a/components/camel-aws/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/client/MSK2InternalClient.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.aws2.msk.client;
-
-import software.amazon.awssdk.services.kafka.KafkaClient;
-
-/**
- * Manage the required actions of a Kafka client for either local or remote.
- */
-public interface MSK2InternalClient {
-
-    /**
-     * Returns an Kafka client after a factory method determines which one to 
return.
-     *
-     * @return KafkaClient KafkaClient
-     */
-    KafkaClient getKafkaClient();
-}
diff --git 
a/components/camel-aws/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/client/impl/MSK2ClientOptimizedImpl.java
 
b/components/camel-aws/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/client/impl/MSK2ClientOptimizedImpl.java
deleted file mode 100644
index e6c048006c6f..000000000000
--- 
a/components/camel-aws/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/client/impl/MSK2ClientOptimizedImpl.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.aws2.msk.client.impl;
-
-import java.net.URI;
-
-import org.apache.camel.component.aws2.msk.MSK2Configuration;
-import org.apache.camel.component.aws2.msk.client.MSK2InternalClient;
-import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.http.SdkHttpClient;
-import software.amazon.awssdk.http.SdkHttpConfigurationOption;
-import software.amazon.awssdk.http.apache.ApacheHttpClient;
-import software.amazon.awssdk.http.apache.ProxyConfiguration;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.kafka.KafkaClient;
-import software.amazon.awssdk.services.kafka.KafkaClientBuilder;
-import software.amazon.awssdk.utils.AttributeMap;
-
-/**
- * Manage an AWS MSK client for all users to use (enabling temporary creds). 
This implementation is for remote instances
- * to manage the credentials on their own (eliminating credential rotations)
- */
-public class MSK2ClientOptimizedImpl implements MSK2InternalClient {
-    private static final Logger LOG = 
LoggerFactory.getLogger(MSK2ClientOptimizedImpl.class);
-    private MSK2Configuration configuration;
-
-    /**
-     * Constructor that uses the config file.
-     */
-    public MSK2ClientOptimizedImpl(MSK2Configuration configuration) {
-        LOG.trace("Creating an AWS MSK client for an ec2 instance with IAM 
temporary credentials (normal for ec2s).");
-        this.configuration = configuration;
-    }
-
-    /**
-     * Getting the Kafka aws client that is used.
-     *
-     * @return Kafka Client.
-     */
-    @Override
-    public KafkaClient getKafkaClient() {
-        KafkaClient client = null;
-        KafkaClientBuilder clientBuilder = KafkaClient.builder();
-        ProxyConfiguration.Builder proxyConfig = null;
-        ApacheHttpClient.Builder httpClientBuilder = null;
-        if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && 
ObjectHelper.isNotEmpty(configuration.getProxyPort())) {
-            proxyConfig = ProxyConfiguration.builder();
-            URI proxyEndpoint = URI.create(configuration.getProxyProtocol() + 
"://" + configuration.getProxyHost() + ":"
-                                           + configuration.getProxyPort());
-            proxyConfig.endpoint(proxyEndpoint);
-            httpClientBuilder = 
ApacheHttpClient.builder().proxyConfiguration(proxyConfig.build());
-            clientBuilder = clientBuilder.httpClientBuilder(httpClientBuilder);
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getRegion())) {
-            clientBuilder = 
clientBuilder.region(Region.of(configuration.getRegion()));
-        }
-        if (configuration.isOverrideEndpoint()) {
-            
clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride()));
-        }
-        if (configuration.isTrustAllCertificates()) {
-            if (httpClientBuilder == null) {
-                httpClientBuilder = ApacheHttpClient.builder();
-            }
-            SdkHttpClient ahc = 
httpClientBuilder.buildWithDefaults(AttributeMap
-                    .builder()
-                    .put(
-                            SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
-                            Boolean.TRUE)
-                    .build());
-            // set created http client to use instead of builder
-            clientBuilder.httpClient(ahc);
-            clientBuilder.httpClientBuilder(null);
-        }
-        client = clientBuilder.build();
-        return client;
-    }
-}
diff --git 
a/components/camel-aws/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/client/impl/MSK2ClientProfileOptimizedImpl.java
 
b/components/camel-aws/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/client/impl/MSK2ClientProfileOptimizedImpl.java
deleted file mode 100644
index 13c84a8a245d..000000000000
--- 
a/components/camel-aws/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/client/impl/MSK2ClientProfileOptimizedImpl.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.aws2.msk.client.impl;
-
-import java.net.URI;
-
-import org.apache.camel.component.aws2.msk.MSK2Configuration;
-import org.apache.camel.component.aws2.msk.client.MSK2InternalClient;
-import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
-import software.amazon.awssdk.http.SdkHttpClient;
-import software.amazon.awssdk.http.SdkHttpConfigurationOption;
-import software.amazon.awssdk.http.apache.ApacheHttpClient;
-import software.amazon.awssdk.http.apache.ProxyConfiguration;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.kafka.KafkaClient;
-import software.amazon.awssdk.services.kafka.KafkaClientBuilder;
-import software.amazon.awssdk.utils.AttributeMap;
-
-/**
- * Manage an AWS MSK client for all users to use (enabling temporary creds). 
This implementation is for remote instances
- * to manage the credentials on their own (eliminating credential rotations)
- */
-public class MSK2ClientProfileOptimizedImpl implements MSK2InternalClient {
-    private static final Logger LOG = 
LoggerFactory.getLogger(MSK2ClientProfileOptimizedImpl.class);
-    private MSK2Configuration configuration;
-
-    /**
-     * Constructor that uses the config file.
-     */
-    public MSK2ClientProfileOptimizedImpl(MSK2Configuration configuration) {
-        LOG.trace("Creating an AWS MSK client for an ec2 instance with IAM 
temporary credentials (normal for ec2s).");
-        this.configuration = configuration;
-    }
-
-    /**
-     * Getting the Kafka aws client that is used.
-     *
-     * @return Kafka Client.
-     */
-    @Override
-    public KafkaClient getKafkaClient() {
-        KafkaClient client = null;
-        KafkaClientBuilder clientBuilder = KafkaClient.builder();
-        ProxyConfiguration.Builder proxyConfig = null;
-        ApacheHttpClient.Builder httpClientBuilder = null;
-        if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && 
ObjectHelper.isNotEmpty(configuration.getProxyPort())) {
-            proxyConfig = ProxyConfiguration.builder();
-            URI proxyEndpoint = URI.create(configuration.getProxyProtocol() + 
"://" + configuration.getProxyHost() + ":"
-                                           + configuration.getProxyPort());
-            proxyConfig.endpoint(proxyEndpoint);
-            httpClientBuilder = 
ApacheHttpClient.builder().proxyConfiguration(proxyConfig.build());
-            clientBuilder = clientBuilder.httpClientBuilder(httpClientBuilder);
-        }
-        if (configuration.getProfileCredentialsName() != null) {
-            clientBuilder = clientBuilder.httpClientBuilder(httpClientBuilder)
-                    
.credentialsProvider(ProfileCredentialsProvider.create(configuration.getProfileCredentialsName()));
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getRegion())) {
-            clientBuilder = 
clientBuilder.region(Region.of(configuration.getRegion()));
-        }
-        if (configuration.isOverrideEndpoint()) {
-            
clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride()));
-        }
-        if (configuration.isTrustAllCertificates()) {
-            if (httpClientBuilder == null) {
-                httpClientBuilder = ApacheHttpClient.builder();
-            }
-            SdkHttpClient ahc = 
httpClientBuilder.buildWithDefaults(AttributeMap
-                    .builder()
-                    .put(
-                            SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
-                            Boolean.TRUE)
-                    .build());
-            // set created http client to use instead of builder
-            clientBuilder.httpClient(ahc);
-            clientBuilder.httpClientBuilder(null);
-        }
-        client = clientBuilder.build();
-        return client;
-    }
-}
diff --git 
a/components/camel-aws/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/client/impl/MSK2ClientSessionTokenImpl.java
 
b/components/camel-aws/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/client/impl/MSK2ClientSessionTokenImpl.java
deleted file mode 100644
index e146cbad9a74..000000000000
--- 
a/components/camel-aws/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/client/impl/MSK2ClientSessionTokenImpl.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.aws2.msk.client.impl;
-
-import java.net.URI;
-
-import org.apache.camel.component.aws2.msk.MSK2Configuration;
-import org.apache.camel.component.aws2.msk.client.MSK2InternalClient;
-import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.http.SdkHttpClient;
-import software.amazon.awssdk.http.SdkHttpConfigurationOption;
-import software.amazon.awssdk.http.apache.ApacheHttpClient;
-import software.amazon.awssdk.http.apache.ProxyConfiguration;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.kafka.KafkaClient;
-import software.amazon.awssdk.services.kafka.KafkaClientBuilder;
-import software.amazon.awssdk.utils.AttributeMap;
-
-/**
- * Manage an AWS MQ client for all users to use. This implementation is for 
local instances to use a static and solid
- * credential set.
- */
-public class MSK2ClientSessionTokenImpl implements MSK2InternalClient {
-    private static final Logger LOG = 
LoggerFactory.getLogger(MSK2ClientSessionTokenImpl.class);
-    private MSK2Configuration configuration;
-
-    /**
-     * Constructor that uses the config file.
-     */
-    public MSK2ClientSessionTokenImpl(MSK2Configuration configuration) {
-        LOG.trace("Creating an AWS MSK manager using static credentials.");
-        this.configuration = configuration;
-    }
-
-    /**
-     * Getting the MQ AWS client that is used.
-     *
-     * @return Amazon MQ Client.
-     */
-    @Override
-    public KafkaClient getKafkaClient() {
-        KafkaClient client = null;
-        KafkaClientBuilder clientBuilder = KafkaClient.builder();
-        ProxyConfiguration.Builder proxyConfig = null;
-        ApacheHttpClient.Builder httpClientBuilder = null;
-        boolean isClientConfigFound = false;
-        if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && 
ObjectHelper.isNotEmpty(configuration.getProxyPort())) {
-            proxyConfig = ProxyConfiguration.builder();
-            URI proxyEndpoint = URI.create(configuration.getProxyProtocol() + 
"://" + configuration.getProxyHost() + ":"
-                                           + configuration.getProxyPort());
-            proxyConfig.endpoint(proxyEndpoint);
-            httpClientBuilder = 
ApacheHttpClient.builder().proxyConfiguration(proxyConfig.build());
-            isClientConfigFound = true;
-        }
-        if (configuration.getAccessKey() != null && 
configuration.getSecretKey() != null
-                && configuration.getSessionToken() != null) {
-            AwsSessionCredentials cred = 
AwsSessionCredentials.create(configuration.getAccessKey(),
-                    configuration.getSecretKey(), 
configuration.getSessionToken());
-            if (isClientConfigFound) {
-                clientBuilder = 
clientBuilder.httpClientBuilder(httpClientBuilder)
-                        
.credentialsProvider(StaticCredentialsProvider.create(cred));
-            } else {
-                clientBuilder = 
clientBuilder.credentialsProvider(StaticCredentialsProvider.create(cred));
-            }
-        } else {
-            if (!isClientConfigFound) {
-                clientBuilder = 
clientBuilder.httpClientBuilder(httpClientBuilder);
-            }
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getRegion())) {
-            clientBuilder = 
clientBuilder.region(Region.of(configuration.getRegion()));
-        }
-        if (configuration.isOverrideEndpoint()) {
-            
clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride()));
-        }
-        if (configuration.isTrustAllCertificates()) {
-            if (httpClientBuilder == null) {
-                httpClientBuilder = ApacheHttpClient.builder();
-            }
-            SdkHttpClient ahc = 
httpClientBuilder.buildWithDefaults(AttributeMap
-                    .builder()
-                    .put(
-                            SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
-                            Boolean.TRUE)
-                    .build());
-            // set created http client to use instead of builder
-            clientBuilder.httpClient(ahc);
-            clientBuilder.httpClientBuilder(null);
-        }
-        client = clientBuilder.build();
-        return client;
-    }
-}
diff --git 
a/components/camel-aws/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/client/impl/MSK2ClientStandardImpl.java
 
b/components/camel-aws/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/client/impl/MSK2ClientStandardImpl.java
deleted file mode 100644
index a64a61f2fa11..000000000000
--- 
a/components/camel-aws/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/client/impl/MSK2ClientStandardImpl.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.aws2.msk.client.impl;
-
-import java.net.URI;
-
-import org.apache.camel.component.aws2.msk.MSK2Configuration;
-import org.apache.camel.component.aws2.msk.client.MSK2InternalClient;
-import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.http.SdkHttpClient;
-import software.amazon.awssdk.http.SdkHttpConfigurationOption;
-import software.amazon.awssdk.http.apache.ApacheHttpClient;
-import software.amazon.awssdk.http.apache.ProxyConfiguration;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.kafka.KafkaClient;
-import software.amazon.awssdk.services.kafka.KafkaClientBuilder;
-import software.amazon.awssdk.utils.AttributeMap;
-
-/**
- * Manage an AWS MQ client for all users to use. This implementation is for 
local instances to use a static and solid
- * credential set.
- */
-public class MSK2ClientStandardImpl implements MSK2InternalClient {
-    private static final Logger LOG = 
LoggerFactory.getLogger(MSK2ClientStandardImpl.class);
-    private MSK2Configuration configuration;
-
-    /**
-     * Constructor that uses the config file.
-     */
-    public MSK2ClientStandardImpl(MSK2Configuration configuration) {
-        LOG.trace("Creating an AWS MSK manager using static credentials.");
-        this.configuration = configuration;
-    }
-
-    /**
-     * Getting the MQ AWS client that is used.
-     *
-     * @return Amazon MQ Client.
-     */
-    @Override
-    public KafkaClient getKafkaClient() {
-        KafkaClient client = null;
-        KafkaClientBuilder clientBuilder = KafkaClient.builder();
-        ProxyConfiguration.Builder proxyConfig = null;
-        ApacheHttpClient.Builder httpClientBuilder = null;
-        boolean isClientConfigFound = false;
-        if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && 
ObjectHelper.isNotEmpty(configuration.getProxyPort())) {
-            proxyConfig = ProxyConfiguration.builder();
-            URI proxyEndpoint = URI.create(configuration.getProxyProtocol() + 
"://" + configuration.getProxyHost() + ":"
-                                           + configuration.getProxyPort());
-            proxyConfig.endpoint(proxyEndpoint);
-            httpClientBuilder = 
ApacheHttpClient.builder().proxyConfiguration(proxyConfig.build());
-            isClientConfigFound = true;
-        }
-        if (configuration.getAccessKey() != null && 
configuration.getSecretKey() != null) {
-            AwsBasicCredentials cred = 
AwsBasicCredentials.create(configuration.getAccessKey(), 
configuration.getSecretKey());
-            if (isClientConfigFound) {
-                clientBuilder = 
clientBuilder.httpClientBuilder(httpClientBuilder)
-                        
.credentialsProvider(StaticCredentialsProvider.create(cred));
-            } else {
-                clientBuilder = 
clientBuilder.credentialsProvider(StaticCredentialsProvider.create(cred));
-            }
-        } else {
-            if (!isClientConfigFound) {
-                clientBuilder = 
clientBuilder.httpClientBuilder(httpClientBuilder);
-            }
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getRegion())) {
-            clientBuilder = 
clientBuilder.region(Region.of(configuration.getRegion()));
-        }
-        if (configuration.isOverrideEndpoint()) {
-            
clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride()));
-        }
-        if (configuration.isTrustAllCertificates()) {
-            if (httpClientBuilder == null) {
-                httpClientBuilder = ApacheHttpClient.builder();
-            }
-            SdkHttpClient ahc = 
httpClientBuilder.buildWithDefaults(AttributeMap
-                    .builder()
-                    .put(
-                            SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
-                            Boolean.TRUE)
-                    .build());
-            // set created http client to use instead of builder
-            clientBuilder.httpClient(ahc);
-            clientBuilder.httpClientBuilder(null);
-        }
-        client = clientBuilder.build();
-        return client;
-    }
-}
diff --git 
a/components/camel-aws/camel-aws2-msk/src/test/java/org/apache/camel/component/aws2/msk/MSK2ClientFactoryTest.java
 
b/components/camel-aws/camel-aws2-msk/src/test/java/org/apache/camel/component/aws2/msk/MSK2ClientFactoryTest.java
index be2467ea8ea3..224dfb0bff57 100644
--- 
a/components/camel-aws/camel-aws2-msk/src/test/java/org/apache/camel/component/aws2/msk/MSK2ClientFactoryTest.java
+++ 
b/components/camel-aws/camel-aws2-msk/src/test/java/org/apache/camel/component/aws2/msk/MSK2ClientFactoryTest.java
@@ -17,44 +17,43 @@
 package org.apache.camel.component.aws2.msk;
 
 import org.apache.camel.component.aws2.msk.client.MSK2ClientFactory;
-import org.apache.camel.component.aws2.msk.client.MSK2InternalClient;
-import org.apache.camel.component.aws2.msk.client.impl.MSK2ClientOptimizedImpl;
-import 
org.apache.camel.component.aws2.msk.client.impl.MSK2ClientSessionTokenImpl;
-import org.apache.camel.component.aws2.msk.client.impl.MSK2ClientStandardImpl;
 import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.kafka.KafkaClient;
 
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 public class MSK2ClientFactoryTest {
 
     @Test
-    public void getStandardMSKClientDefault() {
-        MSK2Configuration msk2Configuration = new MSK2Configuration();
-        MSK2InternalClient mskClient = 
MSK2ClientFactory.getKafkaClient(msk2Configuration);
-        assertTrue(mskClient instanceof MSK2ClientStandardImpl);
+    public void getMskClientWithDefaultCredentials() {
+        MSK2Configuration configuration = new MSK2Configuration();
+        configuration.setUseDefaultCredentialsProvider(true);
+        configuration.setRegion("eu-west-1");
+        KafkaClient kafkaClient = 
MSK2ClientFactory.getKafkaClient(configuration);
+        assertNotNull(kafkaClient);
+        kafkaClient.close();
     }
 
     @Test
-    public void getStandardMSKClient() {
-        MSK2Configuration msk2Configuration = new MSK2Configuration();
-        msk2Configuration.setUseDefaultCredentialsProvider(false);
-        MSK2InternalClient mskClient = 
MSK2ClientFactory.getKafkaClient(msk2Configuration);
-        assertTrue(mskClient instanceof MSK2ClientStandardImpl);
+    public void getMskClientWithStaticCredentials() {
+        MSK2Configuration configuration = new MSK2Configuration();
+        configuration.setAccessKey("testAccessKey");
+        configuration.setSecretKey("testSecretKey");
+        configuration.setRegion("eu-west-1");
+        KafkaClient kafkaClient = 
MSK2ClientFactory.getKafkaClient(configuration);
+        assertNotNull(kafkaClient);
+        kafkaClient.close();
     }
 
     @Test
-    public void getMSKOptimizedMSKClient() {
-        MSK2Configuration msk2Configuration = new MSK2Configuration();
-        msk2Configuration.setUseDefaultCredentialsProvider(true);
-        MSK2InternalClient mskClient = 
MSK2ClientFactory.getKafkaClient(msk2Configuration);
-        assertTrue(mskClient instanceof MSK2ClientOptimizedImpl);
-    }
-
-    @Test
-    public void getMSKSessionTokenClient() {
-        MSK2Configuration msk2Configuration = new MSK2Configuration();
-        msk2Configuration.setUseSessionCredentials(true);
-        MSK2InternalClient mskClient = 
MSK2ClientFactory.getKafkaClient(msk2Configuration);
-        assertTrue(mskClient instanceof MSK2ClientSessionTokenImpl);
+    public void getMskClientWithEndpointOverride() {
+        MSK2Configuration configuration = new MSK2Configuration();
+        configuration.setUseDefaultCredentialsProvider(true);
+        configuration.setRegion("eu-west-1");
+        configuration.setOverrideEndpoint(true);
+        configuration.setUriEndpointOverride("http://localhost:4566";);
+        KafkaClient kafkaClient = 
MSK2ClientFactory.getKafkaClient(configuration);
+        assertNotNull(kafkaClient);
+        kafkaClient.close();
     }
 }

Reply via email to