This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit fbd13e9bade399b939d2b6c6ac3f23b8b31102bf Author: Andrea Cosentino <[email protected]> AuthorDate: Tue Dec 16 10:33:07 2025 +0100 CAMEL-22786 - Camel-AWS: Extract common logic for clients instantiation in a separated module - AWS Kinesis Signed-off-by: Andrea Cosentino <[email protected]> --- components/camel-aws/camel-aws-common/pom.xml | 5 + .../component/aws/common/AwsClientBuilderUtil.java | 95 +++++++++++++++++ components/camel-aws/camel-aws2-kinesis/pom.xml | 4 + .../firehose/KinesisFirehose2Configuration.java | 3 +- .../aws2/firehose/KinesisFirehose2Endpoint.java | 2 +- .../client/KinesisFirehoseClientFactory.java | 28 ++--- .../client/KinesisFirehoseInternalClient.java | 32 ------ .../KinesisFirehoseClientIAMOptimizedImpl.java | 95 ----------------- ...nesisFirehoseClientIAMProfileOptimizedImpl.java | 100 ------------------ .../KinesisFirehoseClientSessionTokenImpl.java | 111 -------------------- .../impl/KinesisFirehoseClientStandardImpl.java | 109 ------------------- .../aws2/kinesis/Kinesis2Configuration.java | 3 +- .../component/aws2/kinesis/KinesisConnection.java | 4 +- .../kinesis/client/KinesisAsyncInternalClient.java | 31 ------ .../aws2/kinesis/client/KinesisClientFactory.java | 46 ++++----- .../aws2/kinesis/client/KinesisInternalClient.java | 32 ------ .../impl/KinesisAsyncClientIAMOptimizedImpl.java | 96 ----------------- .../KinesisAsyncClientIAMProfileOptimizedImpl.java | 101 ------------------ .../impl/KinesisAsyncClientSessionTokenImpl.java | 115 --------------------- .../impl/KinesisAsyncClientStandardImpl.java | 113 -------------------- .../client/impl/KinesisClientIAMOptimizedImpl.java | 94 ----------------- .../impl/KinesisClientIAMProfileOptimizedImpl.java | 99 ------------------ .../client/impl/KinesisClientSessionTokenImpl.java | 111 -------------------- .../client/impl/KinesisClientStandardImpl.java | 109 ------------------- .../firehose/KinesisFirehoseClientFactoryTest.java | 57 +++++----- .../aws2/kinesis/KinesisClientFactoryTest.java | 66 ++++++------ 26 files changed, 201 insertions(+), 1460 deletions(-) diff --git a/components/camel-aws/camel-aws-common/pom.xml b/components/camel-aws/camel-aws-common/pom.xml index 5b4c0e1d1b97..98625ae5e012 100644 --- a/components/camel-aws/camel-aws-common/pom.xml +++ b/components/camel-aws/camel-aws-common/pom.xml @@ -67,6 +67,11 @@ <artifactId>aws-core</artifactId> <version>${aws-java-sdk2-version}</version> </dependency> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>netty-nio-client</artifactId> + <version>${aws-java-sdk2-version}</version> + </dependency> </dependencies> </project> diff --git a/components/camel-aws/camel-aws-common/src/main/java/org/apache/camel/component/aws/common/AwsClientBuilderUtil.java b/components/camel-aws/camel-aws-common/src/main/java/org/apache/camel/component/aws/common/AwsClientBuilderUtil.java index cf08b59fb76f..932aacec7d78 100644 --- a/components/camel-aws/camel-aws-common/src/main/java/org/apache/camel/component/aws/common/AwsClientBuilderUtil.java +++ b/components/camel-aws/camel-aws-common/src/main/java/org/apache/camel/component/aws/common/AwsClientBuilderUtil.java @@ -31,11 +31,14 @@ import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder; import software.amazon.awssdk.core.SdkClient; +import software.amazon.awssdk.core.client.builder.SdkAsyncClientBuilder; import software.amazon.awssdk.core.client.builder.SdkSyncClientBuilder; import software.amazon.awssdk.http.SdkHttpClient; import software.amazon.awssdk.http.SdkHttpConfigurationOption; import software.amazon.awssdk.http.apache.ApacheHttpClient; import software.amazon.awssdk.http.apache.ProxyConfiguration; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.utils.AttributeMap; @@ -143,6 +146,98 @@ public final class AwsClientBuilderUtil { return buildClient(config, builderSupplier, null); } + /** + * Build an AWS async client with the given configuration. + * + * @param config The common AWS configuration + * @param builderSupplier Supplier for the service-specific async client builder (e.g., + * KinesisAsyncClient::builder) + * @param serviceSpecificConfig Optional consumer for service-specific configuration + * @param <B> The builder type (must extend both AwsClientBuilder and SdkAsyncClientBuilder) + * @param <C> The client type + * @return The configured AWS async client + */ + @SuppressWarnings("unchecked") + public static <B extends AwsClientBuilder<B, C> & SdkAsyncClientBuilder<B, C>, C extends SdkClient> C buildAsyncClient( + AwsCommonConfiguration config, + Supplier<B> builderSupplier, + Consumer<B> serviceSpecificConfig) { + + B clientBuilder = builderSupplier.get(); + NettyNioAsyncHttpClient.Builder httpClientBuilder = null; + boolean httpClientConfigured = false; + + // 1. Configure proxy + if (ObjectHelper.isNotEmpty(config.getProxyHost()) + && ObjectHelper.isNotEmpty(config.getProxyPort())) { + LOG.trace("Configuring async proxy: {}:{}", config.getProxyHost(), config.getProxyPort()); + software.amazon.awssdk.http.nio.netty.ProxyConfiguration proxyConfig + = software.amazon.awssdk.http.nio.netty.ProxyConfiguration.builder() + .scheme(config.getProxyProtocol().toString()) + .host(config.getProxyHost()) + .port(config.getProxyPort()) + .build(); + httpClientBuilder = NettyNioAsyncHttpClient.builder().proxyConfiguration(proxyConfig); + httpClientConfigured = true; + } + + // 2. Configure credentials + AwsCredentialsProvider credentialsProvider = resolveCredentialsProvider(config); + if (credentialsProvider != null) { + clientBuilder.credentialsProvider(credentialsProvider); + } + + // 3. Apply HTTP client builder if configured (before trust all certs check) + if (httpClientConfigured) { + clientBuilder.httpClientBuilder(httpClientBuilder); + } + + // 4. Configure region + if (ObjectHelper.isNotEmpty(config.getRegion())) { + clientBuilder.region(Region.of(config.getRegion())); + } + + // 5. Configure endpoint override + if (config.isOverrideEndpoint() && ObjectHelper.isNotEmpty(config.getUriEndpointOverride())) { + clientBuilder.endpointOverride(URI.create(config.getUriEndpointOverride())); + } + + // 6. Configure trust all certificates + if (config.isTrustAllCertificates()) { + if (httpClientBuilder == null) { + httpClientBuilder = NettyNioAsyncHttpClient.builder(); + } + SdkAsyncHttpClient asyncHttpClient = httpClientBuilder.buildWithDefaults( + AttributeMap.builder() + .put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, Boolean.TRUE) + .build()); + clientBuilder.httpClient(asyncHttpClient); + clientBuilder.httpClientBuilder(null); + } + + // 7. Apply service-specific configuration + if (serviceSpecificConfig != null) { + serviceSpecificConfig.accept(clientBuilder); + } + + return clientBuilder.build(); + } + + /** + * Build an AWS async client with the given configuration, without service-specific configuration. + * + * @param config The common AWS configuration + * @param builderSupplier Supplier for the service-specific async client builder + * @param <B> The builder type + * @param <C> The client type + * @return The configured AWS async client + */ + public static <B extends AwsClientBuilder<B, C> & SdkAsyncClientBuilder<B, C>, C extends SdkClient> C buildAsyncClient( + AwsCommonConfiguration config, + Supplier<B> builderSupplier) { + return buildAsyncClient(config, builderSupplier, null); + } + /** * Resolve the appropriate credentials provider based on configuration. * <p> diff --git a/components/camel-aws/camel-aws2-kinesis/pom.xml b/components/camel-aws/camel-aws2-kinesis/pom.xml index ac572cbe67ba..4a7ca805451b 100644 --- a/components/camel-aws/camel-aws2-kinesis/pom.xml +++ b/components/camel-aws/camel-aws2-kinesis/pom.xml @@ -33,6 +33,10 @@ <description>Consuming and Producing data to AWS Kinesis Service</description> <dependencies> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-aws-common</artifactId> + </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-support</artifactId> diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Configuration.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Configuration.java index 3960113314d6..c2062cbe956c 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Configuration.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Configuration.java @@ -17,6 +17,7 @@ package org.apache.camel.component.aws2.firehose; import org.apache.camel.RuntimeCamelException; +import org.apache.camel.component.aws.common.AwsCommonConfiguration; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriParams; @@ -25,7 +26,7 @@ import software.amazon.awssdk.core.Protocol; import software.amazon.awssdk.services.firehose.FirehoseClient; @UriParams -public class KinesisFirehose2Configuration implements Cloneable { +public class KinesisFirehose2Configuration implements Cloneable, AwsCommonConfiguration { @UriPath(description = "Name of the stream") @Metadata(required = true) diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Endpoint.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Endpoint.java index d55f05501221..35e73ae69161 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Endpoint.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Endpoint.java @@ -71,7 +71,7 @@ public class KinesisFirehose2Endpoint extends DefaultEndpoint implements Endpoin } kinesisFirehoseClient = configuration.getAmazonKinesisFirehoseClient() != null ? configuration.getAmazonKinesisFirehoseClient() - : KinesisFirehoseClientFactory.getKinesisFirehoseClient(configuration).getKinesisFirehoseClient(); + : KinesisFirehoseClientFactory.getKinesisFirehoseClient(configuration); } diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/KinesisFirehoseClientFactory.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/KinesisFirehoseClientFactory.java index 0992a8ce8c67..d8b815e0a55c 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/KinesisFirehoseClientFactory.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/KinesisFirehoseClientFactory.java @@ -16,14 +16,12 @@ */ package org.apache.camel.component.aws2.firehose.client; +import org.apache.camel.component.aws.common.AwsClientBuilderUtil; import org.apache.camel.component.aws2.firehose.KinesisFirehose2Configuration; -import org.apache.camel.component.aws2.firehose.client.impl.KinesisFirehoseClientIAMOptimizedImpl; -import org.apache.camel.component.aws2.firehose.client.impl.KinesisFirehoseClientIAMProfileOptimizedImpl; -import org.apache.camel.component.aws2.firehose.client.impl.KinesisFirehoseClientSessionTokenImpl; -import org.apache.camel.component.aws2.firehose.client.impl.KinesisFirehoseClientStandardImpl; +import software.amazon.awssdk.services.firehose.FirehoseClient; /** - * Factory class to return the correct type of AWS Kinesis client. + * Factory class to create AWS Kinesis Firehose clients using common configuration. */ public final class KinesisFirehoseClientFactory { @@ -31,20 +29,14 @@ public final class KinesisFirehoseClientFactory { } /** - * Return the correct aws Kinesis Firehose client (based on remote vs local). + * Create a Firehose client based on configuration. * - * @param configuration configuration - * @return FirehoseClient + * @param configuration The Firehose configuration + * @return Configured FirehoseClient */ - public static KinesisFirehoseInternalClient getKinesisFirehoseClient(KinesisFirehose2Configuration configuration) { - if (Boolean.TRUE.equals(configuration.isUseDefaultCredentialsProvider())) { - return new KinesisFirehoseClientIAMOptimizedImpl(configuration); - } else if (Boolean.TRUE.equals(configuration.isUseProfileCredentialsProvider())) { - return new KinesisFirehoseClientIAMProfileOptimizedImpl(configuration); - } else if (Boolean.TRUE.equals(configuration.isUseSessionCredentials())) { - return new KinesisFirehoseClientSessionTokenImpl(configuration); - } else { - return new KinesisFirehoseClientStandardImpl(configuration); - } + public static FirehoseClient getKinesisFirehoseClient(KinesisFirehose2Configuration configuration) { + return AwsClientBuilderUtil.buildClient( + configuration, + FirehoseClient::builder); } } diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/KinesisFirehoseInternalClient.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/KinesisFirehoseInternalClient.java deleted file mode 100644 index 13e158a30adf..000000000000 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/KinesisFirehoseInternalClient.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.aws2.firehose.client; - -import software.amazon.awssdk.services.firehose.FirehoseClient; - -/** - * Manage the required actions of a Kinesis Firehose client for either local or remote. - */ -public interface KinesisFirehoseInternalClient { - - /** - * Returns a Kinesis Firehose client after a factory method determines which one to return. - * - * @return FirehoseClient client - */ - FirehoseClient getKinesisFirehoseClient(); -} diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/impl/KinesisFirehoseClientIAMOptimizedImpl.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/impl/KinesisFirehoseClientIAMOptimizedImpl.java deleted file mode 100644 index 9d7dba54eeaf..000000000000 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/impl/KinesisFirehoseClientIAMOptimizedImpl.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.aws2.firehose.client.impl; - -import java.net.URI; - -import org.apache.camel.component.aws2.firehose.KinesisFirehose2Configuration; -import org.apache.camel.component.aws2.firehose.client.KinesisFirehoseInternalClient; -import org.apache.camel.util.ObjectHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import software.amazon.awssdk.http.SdkHttpClient; -import software.amazon.awssdk.http.SdkHttpConfigurationOption; -import software.amazon.awssdk.http.apache.ApacheHttpClient; -import software.amazon.awssdk.http.apache.ProxyConfiguration; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.firehose.FirehoseClient; -import software.amazon.awssdk.services.firehose.FirehoseClientBuilder; -import software.amazon.awssdk.utils.AttributeMap; - -/** - * Manage an AWS Kinesis Firehose client for all users to use (enabling temporary creds). This implementation is for - * remote instances to manage the credentials on their own (eliminating credential rotations) - */ -public class KinesisFirehoseClientIAMOptimizedImpl implements KinesisFirehoseInternalClient { - private static final Logger LOG = LoggerFactory.getLogger(KinesisFirehoseClientIAMOptimizedImpl.class); - private KinesisFirehose2Configuration configuration; - - /** - * Constructor that uses the config file. - */ - public KinesisFirehoseClientIAMOptimizedImpl(KinesisFirehose2Configuration configuration) { - LOG.trace( - "Creating an AWS Kinesis Firehose client for an ec2 instance with IAM temporary credentials (normal for ec2s)."); - this.configuration = configuration; - } - - /** - * Getting the Kinesis client that is used. - * - * @return Amazon Kinesis Client. - */ - @Override - public FirehoseClient getKinesisFirehoseClient() { - FirehoseClient client = null; - FirehoseClientBuilder clientBuilder = FirehoseClient.builder(); - ProxyConfiguration.Builder proxyConfig = null; - ApacheHttpClient.Builder httpClientBuilder = null; - - if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && ObjectHelper.isNotEmpty(configuration.getProxyPort())) { - proxyConfig = ProxyConfiguration.builder(); - URI proxyEndpoint = URI.create(configuration.getProxyProtocol() + "://" + configuration.getProxyHost() + ":" - + configuration.getProxyPort()); - proxyConfig.endpoint(proxyEndpoint); - httpClientBuilder = ApacheHttpClient.builder().proxyConfiguration(proxyConfig.build()); - clientBuilder = clientBuilder.httpClientBuilder(httpClientBuilder); - } - if (ObjectHelper.isNotEmpty(configuration.getRegion())) { - clientBuilder = clientBuilder.region(Region.of(configuration.getRegion())); - } - if (configuration.isOverrideEndpoint()) { - clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride())); - } - if (configuration.isTrustAllCertificates()) { - if (httpClientBuilder == null) { - httpClientBuilder = ApacheHttpClient.builder(); - } - SdkHttpClient ahc = httpClientBuilder.buildWithDefaults(AttributeMap - .builder() - .put( - SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, - Boolean.TRUE) - .build()); - // set created http client to use instead of builder - clientBuilder.httpClient(ahc); - clientBuilder.httpClientBuilder(null); - } - client = clientBuilder.build(); - return client; - } -} diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/impl/KinesisFirehoseClientIAMProfileOptimizedImpl.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/impl/KinesisFirehoseClientIAMProfileOptimizedImpl.java deleted file mode 100644 index a721694db653..000000000000 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/impl/KinesisFirehoseClientIAMProfileOptimizedImpl.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.aws2.firehose.client.impl; - -import java.net.URI; - -import org.apache.camel.component.aws2.firehose.KinesisFirehose2Configuration; -import org.apache.camel.component.aws2.firehose.client.KinesisFirehoseInternalClient; -import org.apache.camel.util.ObjectHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; -import software.amazon.awssdk.http.SdkHttpClient; -import software.amazon.awssdk.http.SdkHttpConfigurationOption; -import software.amazon.awssdk.http.apache.ApacheHttpClient; -import software.amazon.awssdk.http.apache.ProxyConfiguration; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.firehose.FirehoseClient; -import software.amazon.awssdk.services.firehose.FirehoseClientBuilder; -import software.amazon.awssdk.utils.AttributeMap; - -/** - * Manage an AWS Kinesis Firehose client for all users to use (enabling temporary creds). This implementation is for - * remote instances to manage the credentials on their own (eliminating credential rotations) - */ -public class KinesisFirehoseClientIAMProfileOptimizedImpl implements KinesisFirehoseInternalClient { - private static final Logger LOG = LoggerFactory.getLogger(KinesisFirehoseClientIAMProfileOptimizedImpl.class); - private KinesisFirehose2Configuration configuration; - - /** - * Constructor that uses the config file. - */ - public KinesisFirehoseClientIAMProfileOptimizedImpl(KinesisFirehose2Configuration configuration) { - LOG.trace( - "Creating an AWS Kinesis Firehose client for an ec2 instance with IAM temporary credentials (normal for ec2s)."); - this.configuration = configuration; - } - - /** - * Getting the Kinesis client that is used. - * - * @return Amazon Kinesis Client. - */ - @Override - public FirehoseClient getKinesisFirehoseClient() { - FirehoseClient client = null; - FirehoseClientBuilder clientBuilder = FirehoseClient.builder(); - ProxyConfiguration.Builder proxyConfig = null; - ApacheHttpClient.Builder httpClientBuilder = null; - - if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && ObjectHelper.isNotEmpty(configuration.getProxyPort())) { - proxyConfig = ProxyConfiguration.builder(); - URI proxyEndpoint = URI.create(configuration.getProxyProtocol() + "://" + configuration.getProxyHost() + ":" - + configuration.getProxyPort()); - proxyConfig.endpoint(proxyEndpoint); - httpClientBuilder = ApacheHttpClient.builder().proxyConfiguration(proxyConfig.build()); - clientBuilder = clientBuilder.httpClientBuilder(httpClientBuilder); - } - if (configuration.getProfileCredentialsName() != null) { - clientBuilder = clientBuilder - .credentialsProvider(ProfileCredentialsProvider.create(configuration.getProfileCredentialsName())); - } - if (ObjectHelper.isNotEmpty(configuration.getRegion())) { - clientBuilder = clientBuilder.region(Region.of(configuration.getRegion())); - } - if (configuration.isOverrideEndpoint()) { - clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride())); - } - if (configuration.isTrustAllCertificates()) { - if (httpClientBuilder == null) { - httpClientBuilder = ApacheHttpClient.builder(); - } - SdkHttpClient ahc = httpClientBuilder.buildWithDefaults(AttributeMap - .builder() - .put( - SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, - Boolean.TRUE) - .build()); - // set created http client to use instead of builder - clientBuilder.httpClient(ahc); - clientBuilder.httpClientBuilder(null); - } - client = clientBuilder.build(); - return client; - } -} diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/impl/KinesisFirehoseClientSessionTokenImpl.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/impl/KinesisFirehoseClientSessionTokenImpl.java deleted file mode 100644 index 304246620054..000000000000 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/impl/KinesisFirehoseClientSessionTokenImpl.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.aws2.firehose.client.impl; - -import java.net.URI; - -import org.apache.camel.component.aws2.firehose.KinesisFirehose2Configuration; -import org.apache.camel.component.aws2.firehose.client.KinesisFirehoseInternalClient; -import org.apache.camel.util.ObjectHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.http.SdkHttpClient; -import software.amazon.awssdk.http.SdkHttpConfigurationOption; -import software.amazon.awssdk.http.apache.ApacheHttpClient; -import software.amazon.awssdk.http.apache.ProxyConfiguration; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.firehose.FirehoseClient; -import software.amazon.awssdk.services.firehose.FirehoseClientBuilder; -import software.amazon.awssdk.utils.AttributeMap; - -/** - * Manage an AWS Kinesis Firehose client for all users to use. This implementation is for local instances to use a - * static and solid credential set. - */ -public class KinesisFirehoseClientSessionTokenImpl implements KinesisFirehoseInternalClient { - private static final Logger LOG = LoggerFactory.getLogger(KinesisFirehoseClientSessionTokenImpl.class); - private KinesisFirehose2Configuration configuration; - - /** - * Constructor that uses the config file. - */ - public KinesisFirehoseClientSessionTokenImpl(KinesisFirehose2Configuration configuration) { - LOG.trace("Creating an AWS Kinesis Firehose manager using static credentials."); - this.configuration = configuration; - } - - /** - * Getting the Kinesis Firehose client that is used. - * - * @return Amazon Kinesis Firehose Client. - */ - @Override - public FirehoseClient getKinesisFirehoseClient() { - FirehoseClient client = null; - FirehoseClientBuilder clientBuilder = FirehoseClient.builder(); - ProxyConfiguration.Builder proxyConfig = null; - ApacheHttpClient.Builder httpClientBuilder = null; - boolean isClientConfigFound = false; - if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && ObjectHelper.isNotEmpty(configuration.getProxyPort())) { - proxyConfig = ProxyConfiguration.builder(); - URI proxyEndpoint = URI.create(configuration.getProxyProtocol() + "://" + configuration.getProxyHost() + ":" - + configuration.getProxyPort()); - proxyConfig.endpoint(proxyEndpoint); - httpClientBuilder = ApacheHttpClient.builder().proxyConfiguration(proxyConfig.build()); - isClientConfigFound = true; - } - if (configuration.getAccessKey() != null && configuration.getSecretKey() != null - && configuration.getSessionToken() != null) { - AwsSessionCredentials cred = AwsSessionCredentials.create(configuration.getAccessKey(), - configuration.getSecretKey(), configuration.getSessionToken()); - if (isClientConfigFound) { - clientBuilder = clientBuilder.httpClientBuilder(httpClientBuilder) - .credentialsProvider(StaticCredentialsProvider.create(cred)); - } else { - clientBuilder = clientBuilder.credentialsProvider(StaticCredentialsProvider.create(cred)); - } - } else { - if (!isClientConfigFound) { - clientBuilder = clientBuilder.httpClientBuilder(httpClientBuilder); - } - } - if (ObjectHelper.isNotEmpty(configuration.getRegion())) { - clientBuilder = clientBuilder.region(Region.of(configuration.getRegion())); - } - if (configuration.isOverrideEndpoint()) { - clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride())); - } - if (configuration.isTrustAllCertificates()) { - if (httpClientBuilder == null) { - httpClientBuilder = ApacheHttpClient.builder(); - } - SdkHttpClient ahc = httpClientBuilder.buildWithDefaults(AttributeMap - .builder() - .put( - SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, - Boolean.TRUE) - .build()); - // set created http client to use instead of builder - clientBuilder.httpClient(ahc); - clientBuilder.httpClientBuilder(null); - } - client = clientBuilder.build(); - return client; - } -} diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/impl/KinesisFirehoseClientStandardImpl.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/impl/KinesisFirehoseClientStandardImpl.java deleted file mode 100644 index 1761b1e4c9fb..000000000000 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/client/impl/KinesisFirehoseClientStandardImpl.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.aws2.firehose.client.impl; - -import java.net.URI; - -import org.apache.camel.component.aws2.firehose.KinesisFirehose2Configuration; -import org.apache.camel.component.aws2.firehose.client.KinesisFirehoseInternalClient; -import org.apache.camel.util.ObjectHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.http.SdkHttpClient; -import software.amazon.awssdk.http.SdkHttpConfigurationOption; -import software.amazon.awssdk.http.apache.ApacheHttpClient; -import software.amazon.awssdk.http.apache.ProxyConfiguration; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.firehose.FirehoseClient; -import software.amazon.awssdk.services.firehose.FirehoseClientBuilder; -import software.amazon.awssdk.utils.AttributeMap; - -/** - * Manage an AWS Kinesis Firehose client for all users to use. This implementation is for local instances to use a - * static and solid credential set. - */ -public class KinesisFirehoseClientStandardImpl implements KinesisFirehoseInternalClient { - private static final Logger LOG = LoggerFactory.getLogger(KinesisFirehoseClientStandardImpl.class); - private KinesisFirehose2Configuration configuration; - - /** - * Constructor that uses the config file. - */ - public KinesisFirehoseClientStandardImpl(KinesisFirehose2Configuration configuration) { - LOG.trace("Creating an AWS Kinesis Firehose manager using static credentials."); - this.configuration = configuration; - } - - /** - * Getting the Kinesis Firehose client that is used. - * - * @return Amazon Kinesis Firehose Client. - */ - @Override - public FirehoseClient getKinesisFirehoseClient() { - FirehoseClient client = null; - FirehoseClientBuilder clientBuilder = FirehoseClient.builder(); - ProxyConfiguration.Builder proxyConfig = null; - ApacheHttpClient.Builder httpClientBuilder = null; - boolean isClientConfigFound = false; - if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && ObjectHelper.isNotEmpty(configuration.getProxyPort())) { - proxyConfig = ProxyConfiguration.builder(); - URI proxyEndpoint = URI.create(configuration.getProxyProtocol() + "://" + configuration.getProxyHost() + ":" - + configuration.getProxyPort()); - proxyConfig.endpoint(proxyEndpoint); - httpClientBuilder = ApacheHttpClient.builder().proxyConfiguration(proxyConfig.build()); - isClientConfigFound = true; - } - if (configuration.getAccessKey() != null && configuration.getSecretKey() != null) { - AwsBasicCredentials cred = AwsBasicCredentials.create(configuration.getAccessKey(), configuration.getSecretKey()); - if (isClientConfigFound) { - clientBuilder = clientBuilder.httpClientBuilder(httpClientBuilder) - .credentialsProvider(StaticCredentialsProvider.create(cred)); - } else { - clientBuilder = clientBuilder.credentialsProvider(StaticCredentialsProvider.create(cred)); - } - } else { - if (!isClientConfigFound) { - clientBuilder = clientBuilder.httpClientBuilder(httpClientBuilder); - } - } - if (ObjectHelper.isNotEmpty(configuration.getRegion())) { - clientBuilder = clientBuilder.region(Region.of(configuration.getRegion())); - } - if (configuration.isOverrideEndpoint()) { - clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride())); - } - if (configuration.isTrustAllCertificates()) { - if (httpClientBuilder == null) { - httpClientBuilder = ApacheHttpClient.builder(); - } - SdkHttpClient ahc = httpClientBuilder.buildWithDefaults(AttributeMap - .builder() - .put( - SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, - Boolean.TRUE) - .build()); - // set created http client to use instead of builder - clientBuilder.httpClient(ahc); - clientBuilder.httpClientBuilder(null); - } - client = clientBuilder.build(); - return client; - } -} diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java index 2a8273783b13..072e9c2dc04a 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java @@ -17,6 +17,7 @@ package org.apache.camel.component.aws2.kinesis; import org.apache.camel.RuntimeCamelException; +import org.apache.camel.component.aws.common.AwsCommonConfiguration; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriParams; @@ -29,7 +30,7 @@ import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; @UriParams -public class Kinesis2Configuration implements Cloneable { +public class Kinesis2Configuration implements Cloneable, AwsCommonConfiguration { @UriPath(description = "Name of the stream") @Metadata(required = true) diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/KinesisConnection.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/KinesisConnection.java index b265c9632cd8..b02cab0b8b7e 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/KinesisConnection.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/KinesisConnection.java @@ -41,7 +41,7 @@ public class KinesisConnection implements Closeable { if (Objects.isNull(kinesisClient)) { kinesisClient = endpoint.getConfiguration().getAmazonKinesisClient() != null ? endpoint.getConfiguration().getAmazonKinesisClient() - : KinesisClientFactory.getKinesisClient(endpoint.getConfiguration()).getKinesisClient(); + : KinesisClientFactory.getKinesisClient(endpoint.getConfiguration()); } return kinesisClient; } finally { @@ -55,7 +55,7 @@ public class KinesisConnection implements Closeable { if (Objects.isNull(kinesisAsyncClient)) { kinesisAsyncClient = endpoint.getConfiguration().getAmazonKinesisAsyncClient() != null ? endpoint.getConfiguration().getAmazonKinesisAsyncClient() - : KinesisClientFactory.getKinesisAsyncClient(endpoint.getConfiguration()).getKinesisAsyncClient(); + : KinesisClientFactory.getKinesisAsyncClient(endpoint.getConfiguration()); } return kinesisAsyncClient; } finally { diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisAsyncInternalClient.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisAsyncInternalClient.java deleted file mode 100644 index 963fe277fb8a..000000000000 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisAsyncInternalClient.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.aws2.kinesis.client; - -import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; - -/** - * Manage the required actions of a Kinesis Async client for either local or remote. - */ -public interface KinesisAsyncInternalClient { - /** - * Returns a Kinesis Async client. - * - * @return KinesisAsyncClient client - */ - KinesisAsyncClient getKinesisAsyncClient(); -} diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisClientFactory.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisClientFactory.java index e8233c67b5f7..66d65d709c83 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisClientFactory.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisClientFactory.java @@ -16,11 +16,13 @@ */ package org.apache.camel.component.aws2.kinesis.client; +import org.apache.camel.component.aws.common.AwsClientBuilderUtil; import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration; -import org.apache.camel.component.aws2.kinesis.client.impl.*; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.KinesisClient; /** - * Factory class to return the correct type of AWS Kinesis client. + * Factory class to create AWS Kinesis clients using common configuration. */ public final class KinesisClientFactory { @@ -28,38 +30,26 @@ public final class KinesisClientFactory { } /** - * Return the correct aws Kinesis client (based on remote vs local). + * Create a Kinesis sync client based on configuration. * - * @param configuration configuration - * @return KinesisClient + * @param configuration The Kinesis configuration + * @return Configured KinesisClient */ - public static KinesisInternalClient getKinesisClient(Kinesis2Configuration configuration) { - if (Boolean.TRUE.equals(configuration.isUseDefaultCredentialsProvider())) { - return new KinesisClientIAMOptimizedImpl(configuration); - } else if (Boolean.TRUE.equals(configuration.isUseProfileCredentialsProvider())) { - return new KinesisClientIAMProfileOptimizedImpl(configuration); - } else if (Boolean.TRUE.equals(configuration.isUseSessionCredentials())) { - return new KinesisClientSessionTokenImpl(configuration); - } else { - return new KinesisClientStandardImpl(configuration); - } + public static KinesisClient getKinesisClient(Kinesis2Configuration configuration) { + return AwsClientBuilderUtil.buildClient( + configuration, + KinesisClient::builder); } /** - * Return the standard aws Kinesis Async client. + * Create a Kinesis async client based on configuration. * - * @param configuration configuration - * @return KinesisAsyncClient + * @param configuration The Kinesis configuration + * @return Configured KinesisAsyncClient */ - public static KinesisAsyncInternalClient getKinesisAsyncClient(Kinesis2Configuration configuration) { - if (Boolean.TRUE.equals(configuration.isUseDefaultCredentialsProvider())) { - return new KinesisAsyncClientIAMOptimizedImpl(configuration); - } else if (Boolean.TRUE.equals(configuration.isUseProfileCredentialsProvider())) { - return new KinesisAsyncClientIAMProfileOptimizedImpl(configuration); - } else if (Boolean.TRUE.equals(configuration.isUseSessionCredentials())) { - return new KinesisAsyncClientSessionTokenImpl(configuration); - } else { - return new KinesisAsyncClientStandardImpl(configuration); - } + public static KinesisAsyncClient getKinesisAsyncClient(Kinesis2Configuration configuration) { + return AwsClientBuilderUtil.buildAsyncClient( + configuration, + KinesisAsyncClient::builder); } } diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisInternalClient.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisInternalClient.java deleted file mode 100644 index e22cf6d723b6..000000000000 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisInternalClient.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.aws2.kinesis.client; - -import software.amazon.awssdk.services.kinesis.KinesisClient; - -/** - * Manage the required actions of a Kinesis client for either local or remote. - */ -public interface KinesisInternalClient { - - /** - * Returns a Kinesis client after a factory method determines which one to return. - * - * @return KinesisClient client - */ - KinesisClient getKinesisClient(); -} diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientIAMOptimizedImpl.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientIAMOptimizedImpl.java deleted file mode 100644 index 0f68516d31d3..000000000000 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientIAMOptimizedImpl.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.aws2.kinesis.client.impl; - -import java.net.URI; - -import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration; -import org.apache.camel.component.aws2.kinesis.client.KinesisAsyncInternalClient; -import org.apache.camel.util.ObjectHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import software.amazon.awssdk.http.SdkHttpConfigurationOption; -import software.amazon.awssdk.http.async.SdkAsyncHttpClient; -import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; -import software.amazon.awssdk.http.nio.netty.ProxyConfiguration; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; -import software.amazon.awssdk.utils.AttributeMap; - -/** - * Manage an AWS Kinesis Async client for all users to use (enabling temporary creds). This implementation is for remote - * instances to manage the credentials on their own (eliminating credential rotations) - */ -public class KinesisAsyncClientIAMOptimizedImpl implements KinesisAsyncInternalClient { - private static final Logger LOG = LoggerFactory.getLogger(KinesisAsyncClientIAMOptimizedImpl.class); - private Kinesis2Configuration configuration; - - /** - * Constructor that uses the config file. - */ - public KinesisAsyncClientIAMOptimizedImpl(Kinesis2Configuration configuration) { - LOG.trace("Creating an AWS Kinesis Async client for an ec2 instance with IAM temporary credentials (normal for ec2s)."); - this.configuration = configuration; - } - - /** - * Getting the Kinesis Async client that is used. - * - * @return Amazon Kinesis Async Client. - */ - @Override - public KinesisAsyncClient getKinesisAsyncClient() { - var clientBuilder = KinesisAsyncClient.builder(); - SdkAsyncHttpClient.Builder httpClientBuilder = null; - - if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && ObjectHelper.isNotEmpty(configuration.getProxyPort())) { - var proxyConfig = ProxyConfiguration - .builder() - .scheme(configuration.getProxyProtocol().toString()) - .host(configuration.getProxyHost()) - .port(configuration.getProxyPort()) - .build(); - httpClientBuilder = NettyNioAsyncHttpClient - .builder() - .proxyConfiguration(proxyConfig); - clientBuilder = clientBuilder.httpClientBuilder(httpClientBuilder); - } - if (ObjectHelper.isNotEmpty(configuration.getRegion())) { - clientBuilder = clientBuilder.region(Region.of(configuration.getRegion())); - } - if (configuration.isOverrideEndpoint()) { - clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride())); - } - if (configuration.isTrustAllCertificates()) { - if (httpClientBuilder == null) { - httpClientBuilder = NettyNioAsyncHttpClient.builder(); - } - SdkAsyncHttpClient ahc = httpClientBuilder - .buildWithDefaults(AttributeMap - .builder() - .put( - SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, - Boolean.TRUE) - .build()); - // set created http client to use instead of builder - clientBuilder.httpClient(ahc); - clientBuilder.httpClientBuilder(null); - } - return clientBuilder.build(); - } - -} diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientIAMProfileOptimizedImpl.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientIAMProfileOptimizedImpl.java deleted file mode 100644 index 3153f0684729..000000000000 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientIAMProfileOptimizedImpl.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.aws2.kinesis.client.impl; - -import java.net.URI; - -import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration; -import org.apache.camel.component.aws2.kinesis.client.KinesisAsyncInternalClient; -import org.apache.camel.util.ObjectHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; -import software.amazon.awssdk.http.SdkHttpConfigurationOption; -import software.amazon.awssdk.http.async.SdkAsyncHttpClient; -import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; -import software.amazon.awssdk.http.nio.netty.ProxyConfiguration; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; -import software.amazon.awssdk.utils.AttributeMap; - -/** - * Manage an AWS Kinesis Async client for all users to use (enabling temporary creds). This implementation is for remote - * instances to manage the credentials on their own (eliminating credential rotations) - */ -public class KinesisAsyncClientIAMProfileOptimizedImpl implements KinesisAsyncInternalClient { - private static final Logger LOG = LoggerFactory.getLogger(KinesisAsyncClientIAMProfileOptimizedImpl.class); - private Kinesis2Configuration configuration; - - /** - * Constructor that uses the config file. - */ - public KinesisAsyncClientIAMProfileOptimizedImpl(Kinesis2Configuration configuration) { - LOG.trace("Creating an AWS Kinesis Async client for an ec2 instance with IAM temporary credentials (normal for ec2s)."); - this.configuration = configuration; - } - - /** - * Getting the KinesisAsync client that is used. - * - * @return Amazon Kinesis Async Client. - */ - @Override - public KinesisAsyncClient getKinesisAsyncClient() { - var clientBuilder = KinesisAsyncClient.builder(); - SdkAsyncHttpClient.Builder httpClientBuilder = null; - - if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && ObjectHelper.isNotEmpty(configuration.getProxyPort())) { - var proxyConfig = ProxyConfiguration - .builder() - .scheme(configuration.getProxyProtocol().toString()) - .host(configuration.getProxyHost()) - .port(configuration.getProxyPort()) - .build(); - httpClientBuilder = NettyNioAsyncHttpClient - .builder() - .proxyConfiguration(proxyConfig); - clientBuilder = clientBuilder.httpClientBuilder(httpClientBuilder); - } - if (configuration.getProfileCredentialsName() != null) { - clientBuilder = clientBuilder - .credentialsProvider(ProfileCredentialsProvider.create(configuration.getProfileCredentialsName())); - } - if (ObjectHelper.isNotEmpty(configuration.getRegion())) { - clientBuilder = clientBuilder.region(Region.of(configuration.getRegion())); - } - if (configuration.isOverrideEndpoint()) { - clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride())); - } - if (configuration.isTrustAllCertificates()) { - if (httpClientBuilder == null) { - httpClientBuilder = NettyNioAsyncHttpClient.builder(); - } - SdkAsyncHttpClient ahc = httpClientBuilder - .buildWithDefaults(AttributeMap - .builder() - .put( - SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, - Boolean.TRUE) - .build()); - // set created http client to use instead of builder - clientBuilder.httpClient(ahc); - clientBuilder.httpClientBuilder(null); - } - return clientBuilder.build(); - } - -} diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientSessionTokenImpl.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientSessionTokenImpl.java deleted file mode 100644 index d6db29ad541a..000000000000 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientSessionTokenImpl.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.aws2.kinesis.client.impl; - -import java.net.URI; -import java.util.Objects; - -import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration; -import org.apache.camel.component.aws2.kinesis.client.KinesisAsyncInternalClient; -import org.apache.camel.util.ObjectHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.http.SdkHttpConfigurationOption; -import software.amazon.awssdk.http.async.SdkAsyncHttpClient; -import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; -import software.amazon.awssdk.http.nio.netty.ProxyConfiguration; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; -import software.amazon.awssdk.utils.AttributeMap; - -/** - * Manage an AWS Async Kinesis client for all users to use. This implementation is for local instances to use a static - * and solid credential set. - */ -public class KinesisAsyncClientSessionTokenImpl implements KinesisAsyncInternalClient { - private static final Logger LOG = LoggerFactory.getLogger(KinesisAsyncClientSessionTokenImpl.class); - private Kinesis2Configuration configuration; - - /** - * Constructor that uses the config file. - */ - public KinesisAsyncClientSessionTokenImpl(Kinesis2Configuration configuration) { - LOG.trace("Creating an AWS Async Kinesis manager using static credentials."); - this.configuration = configuration; - } - - /** - * Getting the Kinesis Async client that is used. - * - * @return Amazon Kinesis Async Client. - */ - @Override - public KinesisAsyncClient getKinesisAsyncClient() { - var clientBuilder = KinesisAsyncClient.builder(); - var isClientConfigFound = false; - SdkAsyncHttpClient.Builder httpClientBuilder = null; - - if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && ObjectHelper.isNotEmpty(configuration.getProxyPort())) { - var proxyConfig = ProxyConfiguration - .builder() - .scheme(configuration.getProxyProtocol().toString()) - .host(configuration.getProxyHost()) - .port(configuration.getProxyPort()) - .build(); - httpClientBuilder = NettyNioAsyncHttpClient - .builder() - .proxyConfiguration(proxyConfig); - isClientConfigFound = true; - } - if (Objects.nonNull(configuration.getAccessKey()) && Objects.nonNull(configuration.getSecretKey()) - && Objects.nonNull(configuration.getSessionToken())) { - var cred = AwsSessionCredentials.create(configuration.getAccessKey(), configuration.getSecretKey(), - configuration.getSessionToken()); - if (isClientConfigFound) { - clientBuilder = clientBuilder - .httpClientBuilder(httpClientBuilder) - .credentialsProvider(StaticCredentialsProvider.create(cred)); - } else { - clientBuilder = clientBuilder.credentialsProvider(StaticCredentialsProvider.create(cred)); - } - } else { - if (!isClientConfigFound) { - clientBuilder = clientBuilder.httpClientBuilder(null); - } - } - if (ObjectHelper.isNotEmpty(configuration.getRegion())) { - clientBuilder = clientBuilder.region(Region.of(configuration.getRegion())); - } - if (configuration.isOverrideEndpoint()) { - clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride())); - } - if (configuration.isTrustAllCertificates()) { - if (httpClientBuilder == null) { - httpClientBuilder = NettyNioAsyncHttpClient.builder(); - } - SdkAsyncHttpClient ahc = httpClientBuilder - .buildWithDefaults(AttributeMap - .builder() - .put( - SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, - Boolean.TRUE) - .build()); - // set created http client to use instead of builder - clientBuilder.httpClient(ahc); - clientBuilder.httpClientBuilder(null); - } - return clientBuilder.build(); - } -} diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientStandardImpl.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientStandardImpl.java deleted file mode 100644 index b743e66ea6c3..000000000000 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientStandardImpl.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.aws2.kinesis.client.impl; - -import java.net.URI; -import java.util.Objects; - -import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration; -import org.apache.camel.component.aws2.kinesis.client.KinesisAsyncInternalClient; -import org.apache.camel.util.ObjectHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.http.SdkHttpConfigurationOption; -import software.amazon.awssdk.http.async.SdkAsyncHttpClient; -import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; -import software.amazon.awssdk.http.nio.netty.ProxyConfiguration; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; -import software.amazon.awssdk.utils.AttributeMap; - -/** - * Manage an AWS Async Kinesis client for all users to use. This implementation is for local instances to use a static - * and solid credential set. - */ -public class KinesisAsyncClientStandardImpl implements KinesisAsyncInternalClient { - private static final Logger LOG = LoggerFactory.getLogger(KinesisAsyncClientStandardImpl.class); - private Kinesis2Configuration configuration; - - /** - * Constructor that uses the config file. - */ - public KinesisAsyncClientStandardImpl(Kinesis2Configuration configuration) { - LOG.trace("Creating an AWS Async Kinesis manager using static credentials."); - this.configuration = configuration; - } - - /** - * Getting the Kinesis Async client that is used. - * - * @return Amazon Kinesis Async Client. - */ - @Override - public KinesisAsyncClient getKinesisAsyncClient() { - var clientBuilder = KinesisAsyncClient.builder(); - var isClientConfigFound = false; - SdkAsyncHttpClient.Builder httpClientBuilder = null; - - if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && ObjectHelper.isNotEmpty(configuration.getProxyPort())) { - var proxyConfig = ProxyConfiguration - .builder() - .scheme(configuration.getProxyProtocol().toString()) - .host(configuration.getProxyHost()) - .port(configuration.getProxyPort()) - .build(); - httpClientBuilder = NettyNioAsyncHttpClient - .builder() - .proxyConfiguration(proxyConfig); - isClientConfigFound = true; - } - if (Objects.nonNull(configuration.getAccessKey()) && Objects.nonNull(configuration.getSecretKey())) { - var cred = AwsBasicCredentials.create(configuration.getAccessKey(), configuration.getSecretKey()); - if (isClientConfigFound) { - clientBuilder = clientBuilder - .httpClientBuilder(httpClientBuilder) - .credentialsProvider(StaticCredentialsProvider.create(cred)); - } else { - clientBuilder = clientBuilder.credentialsProvider(StaticCredentialsProvider.create(cred)); - } - } else { - if (!isClientConfigFound) { - clientBuilder = clientBuilder.httpClientBuilder(null); - } - } - if (ObjectHelper.isNotEmpty(configuration.getRegion())) { - clientBuilder = clientBuilder.region(Region.of(configuration.getRegion())); - } - if (configuration.isOverrideEndpoint()) { - clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride())); - } - if (configuration.isTrustAllCertificates()) { - if (httpClientBuilder == null) { - httpClientBuilder = NettyNioAsyncHttpClient.builder(); - } - SdkAsyncHttpClient ahc = httpClientBuilder - .buildWithDefaults(AttributeMap - .builder() - .put( - SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, - Boolean.TRUE) - .build()); - // set created http client to use instead of builder - clientBuilder.httpClient(ahc); - clientBuilder.httpClientBuilder(null); - } - return clientBuilder.build(); - } -} diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientIAMOptimizedImpl.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientIAMOptimizedImpl.java deleted file mode 100644 index f4ce342dd2d7..000000000000 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientIAMOptimizedImpl.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.aws2.kinesis.client.impl; - -import java.net.URI; - -import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration; -import org.apache.camel.component.aws2.kinesis.client.KinesisInternalClient; -import org.apache.camel.util.ObjectHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import software.amazon.awssdk.http.SdkHttpClient; -import software.amazon.awssdk.http.SdkHttpConfigurationOption; -import software.amazon.awssdk.http.apache.ApacheHttpClient; -import software.amazon.awssdk.http.apache.ProxyConfiguration; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.kinesis.KinesisClient; -import software.amazon.awssdk.services.kinesis.KinesisClientBuilder; -import software.amazon.awssdk.utils.AttributeMap; - -/** - * Manage an AWS Kinesis client for all users to use (enabling temporary creds). This implementation is for remote - * instances to manage the credentials on their own (eliminating credential rotations) - */ -public class KinesisClientIAMOptimizedImpl implements KinesisInternalClient { - private static final Logger LOG = LoggerFactory.getLogger(KinesisClientIAMOptimizedImpl.class); - private Kinesis2Configuration configuration; - - /** - * Constructor that uses the config file. - */ - public KinesisClientIAMOptimizedImpl(Kinesis2Configuration configuration) { - LOG.trace("Creating an AWS Kinesis client for an ec2 instance with IAM temporary credentials (normal for ec2s)."); - this.configuration = configuration; - } - - /** - * Getting the Kinesis client that is used. - * - * @return Amazon Kinesis Client. - */ - @Override - public KinesisClient getKinesisClient() { - KinesisClient client = null; - KinesisClientBuilder clientBuilder = KinesisClient.builder(); - ProxyConfiguration.Builder proxyConfig = null; - ApacheHttpClient.Builder httpClientBuilder = null; - - if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && ObjectHelper.isNotEmpty(configuration.getProxyPort())) { - proxyConfig = ProxyConfiguration.builder(); - URI proxyEndpoint = URI.create(configuration.getProxyProtocol() + "://" + configuration.getProxyHost() + ":" - + configuration.getProxyPort()); - proxyConfig.endpoint(proxyEndpoint); - httpClientBuilder = ApacheHttpClient.builder().proxyConfiguration(proxyConfig.build()); - clientBuilder = clientBuilder.httpClientBuilder(httpClientBuilder); - } - if (ObjectHelper.isNotEmpty(configuration.getRegion())) { - clientBuilder = clientBuilder.region(Region.of(configuration.getRegion())); - } - if (configuration.isOverrideEndpoint()) { - clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride())); - } - if (configuration.isTrustAllCertificates()) { - if (httpClientBuilder == null) { - httpClientBuilder = ApacheHttpClient.builder(); - } - SdkHttpClient ahc = httpClientBuilder.buildWithDefaults(AttributeMap - .builder() - .put( - SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, - Boolean.TRUE) - .build()); - // set created http client to use instead of builder - clientBuilder.httpClient(ahc); - clientBuilder.httpClientBuilder(null); - } - client = clientBuilder.build(); - return client; - } -} diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientIAMProfileOptimizedImpl.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientIAMProfileOptimizedImpl.java deleted file mode 100644 index a5c9cd6689b1..000000000000 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientIAMProfileOptimizedImpl.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.aws2.kinesis.client.impl; - -import java.net.URI; - -import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration; -import org.apache.camel.component.aws2.kinesis.client.KinesisInternalClient; -import org.apache.camel.util.ObjectHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; -import software.amazon.awssdk.http.SdkHttpClient; -import software.amazon.awssdk.http.SdkHttpConfigurationOption; -import software.amazon.awssdk.http.apache.ApacheHttpClient; -import software.amazon.awssdk.http.apache.ProxyConfiguration; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.kinesis.KinesisClient; -import software.amazon.awssdk.services.kinesis.KinesisClientBuilder; -import software.amazon.awssdk.utils.AttributeMap; - -/** - * Manage an AWS Kinesis client for all users to use (enabling temporary creds). This implementation is for remote - * instances to manage the credentials on their own (eliminating credential rotations) - */ -public class KinesisClientIAMProfileOptimizedImpl implements KinesisInternalClient { - private static final Logger LOG = LoggerFactory.getLogger(KinesisClientIAMProfileOptimizedImpl.class); - private Kinesis2Configuration configuration; - - /** - * Constructor that uses the config file. - */ - public KinesisClientIAMProfileOptimizedImpl(Kinesis2Configuration configuration) { - LOG.trace("Creating an AWS Kinesis client for an ec2 instance with IAM temporary credentials (normal for ec2s)."); - this.configuration = configuration; - } - - /** - * Getting the Kinesis client that is used. - * - * @return Amazon Kinesis Client. - */ - @Override - public KinesisClient getKinesisClient() { - KinesisClient client = null; - KinesisClientBuilder clientBuilder = KinesisClient.builder(); - ProxyConfiguration.Builder proxyConfig = null; - ApacheHttpClient.Builder httpClientBuilder = null; - - if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && ObjectHelper.isNotEmpty(configuration.getProxyPort())) { - proxyConfig = ProxyConfiguration.builder(); - URI proxyEndpoint = URI.create(configuration.getProxyProtocol() + "://" + configuration.getProxyHost() + ":" - + configuration.getProxyPort()); - proxyConfig.endpoint(proxyEndpoint); - httpClientBuilder = ApacheHttpClient.builder().proxyConfiguration(proxyConfig.build()); - clientBuilder = clientBuilder.httpClientBuilder(httpClientBuilder); - } - if (configuration.getProfileCredentialsName() != null) { - clientBuilder = clientBuilder - .credentialsProvider(ProfileCredentialsProvider.create(configuration.getProfileCredentialsName())); - } - if (ObjectHelper.isNotEmpty(configuration.getRegion())) { - clientBuilder = clientBuilder.region(Region.of(configuration.getRegion())); - } - if (configuration.isOverrideEndpoint()) { - clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride())); - } - if (configuration.isTrustAllCertificates()) { - if (httpClientBuilder == null) { - httpClientBuilder = ApacheHttpClient.builder(); - } - SdkHttpClient ahc = httpClientBuilder.buildWithDefaults(AttributeMap - .builder() - .put( - SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, - Boolean.TRUE) - .build()); - // set created http client to use instead of builder - clientBuilder.httpClient(ahc); - clientBuilder.httpClientBuilder(null); - } - client = clientBuilder.build(); - return client; - } -} diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientSessionTokenImpl.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientSessionTokenImpl.java deleted file mode 100644 index 41eeffb9b576..000000000000 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientSessionTokenImpl.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.aws2.kinesis.client.impl; - -import java.net.URI; - -import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration; -import org.apache.camel.component.aws2.kinesis.client.KinesisInternalClient; -import org.apache.camel.util.ObjectHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.http.SdkHttpClient; -import software.amazon.awssdk.http.SdkHttpConfigurationOption; -import software.amazon.awssdk.http.apache.ApacheHttpClient; -import software.amazon.awssdk.http.apache.ProxyConfiguration; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.kinesis.KinesisClient; -import software.amazon.awssdk.services.kinesis.KinesisClientBuilder; -import software.amazon.awssdk.utils.AttributeMap; - -/** - * Manage an AWS Kinesis client for all users to use. This implementation is for local instances to use a static and - * solid credential set. - */ -public class KinesisClientSessionTokenImpl implements KinesisInternalClient { - private static final Logger LOG = LoggerFactory.getLogger(KinesisClientSessionTokenImpl.class); - private Kinesis2Configuration configuration; - - /** - * Constructor that uses the config file. - */ - public KinesisClientSessionTokenImpl(Kinesis2Configuration configuration) { - LOG.trace("Creating an AWS Kinesis manager using static credentials."); - this.configuration = configuration; - } - - /** - * Getting the Kinesis client that is used. - * - * @return Amazon Kinesis Client. - */ - @Override - public KinesisClient getKinesisClient() { - KinesisClient client = null; - KinesisClientBuilder clientBuilder = KinesisClient.builder(); - ProxyConfiguration.Builder proxyConfig = null; - ApacheHttpClient.Builder httpClientBuilder = null; - boolean isClientConfigFound = false; - if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && ObjectHelper.isNotEmpty(configuration.getProxyPort())) { - proxyConfig = ProxyConfiguration.builder(); - URI proxyEndpoint = URI.create(configuration.getProxyProtocol() + "://" + configuration.getProxyHost() + ":" - + configuration.getProxyPort()); - proxyConfig.endpoint(proxyEndpoint); - httpClientBuilder = ApacheHttpClient.builder().proxyConfiguration(proxyConfig.build()); - isClientConfigFound = true; - } - if (configuration.getAccessKey() != null && configuration.getSecretKey() != null - && configuration.getSessionToken() != null) { - AwsSessionCredentials cred = AwsSessionCredentials.create(configuration.getAccessKey(), - configuration.getSecretKey(), configuration.getSessionToken()); - if (isClientConfigFound) { - clientBuilder = clientBuilder.httpClientBuilder(httpClientBuilder) - .credentialsProvider(StaticCredentialsProvider.create(cred)); - } else { - clientBuilder = clientBuilder.credentialsProvider(StaticCredentialsProvider.create(cred)); - } - } else { - if (!isClientConfigFound) { - clientBuilder = clientBuilder.httpClientBuilder(httpClientBuilder); - } - } - if (ObjectHelper.isNotEmpty(configuration.getRegion())) { - clientBuilder = clientBuilder.region(Region.of(configuration.getRegion())); - } - if (configuration.isOverrideEndpoint()) { - clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride())); - } - if (configuration.isTrustAllCertificates()) { - if (httpClientBuilder == null) { - httpClientBuilder = ApacheHttpClient.builder(); - } - SdkHttpClient ahc = httpClientBuilder.buildWithDefaults(AttributeMap - .builder() - .put( - SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, - Boolean.TRUE) - .build()); - // set created http client to use instead of builder - clientBuilder.httpClient(ahc); - clientBuilder.httpClientBuilder(null); - } - client = clientBuilder.build(); - return client; - } -} diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientStandardImpl.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientStandardImpl.java deleted file mode 100644 index 5858f852233e..000000000000 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientStandardImpl.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.aws2.kinesis.client.impl; - -import java.net.URI; - -import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration; -import org.apache.camel.component.aws2.kinesis.client.KinesisInternalClient; -import org.apache.camel.util.ObjectHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.http.SdkHttpClient; -import software.amazon.awssdk.http.SdkHttpConfigurationOption; -import software.amazon.awssdk.http.apache.ApacheHttpClient; -import software.amazon.awssdk.http.apache.ProxyConfiguration; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.kinesis.KinesisClient; -import software.amazon.awssdk.services.kinesis.KinesisClientBuilder; -import software.amazon.awssdk.utils.AttributeMap; - -/** - * Manage an AWS Kinesis client for all users to use. This implementation is for local instances to use a static and - * solid credential set. - */ -public class KinesisClientStandardImpl implements KinesisInternalClient { - private static final Logger LOG = LoggerFactory.getLogger(KinesisClientStandardImpl.class); - private Kinesis2Configuration configuration; - - /** - * Constructor that uses the config file. - */ - public KinesisClientStandardImpl(Kinesis2Configuration configuration) { - LOG.trace("Creating an AWS Kinesis manager using static credentials."); - this.configuration = configuration; - } - - /** - * Getting the Kinesis client that is used. - * - * @return Amazon Kinesis Client. - */ - @Override - public KinesisClient getKinesisClient() { - KinesisClient client = null; - KinesisClientBuilder clientBuilder = KinesisClient.builder(); - ProxyConfiguration.Builder proxyConfig = null; - ApacheHttpClient.Builder httpClientBuilder = null; - boolean isClientConfigFound = false; - if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && ObjectHelper.isNotEmpty(configuration.getProxyPort())) { - proxyConfig = ProxyConfiguration.builder(); - URI proxyEndpoint = URI.create(configuration.getProxyProtocol() + "://" + configuration.getProxyHost() + ":" - + configuration.getProxyPort()); - proxyConfig.endpoint(proxyEndpoint); - httpClientBuilder = ApacheHttpClient.builder().proxyConfiguration(proxyConfig.build()); - isClientConfigFound = true; - } - if (configuration.getAccessKey() != null && configuration.getSecretKey() != null) { - AwsBasicCredentials cred = AwsBasicCredentials.create(configuration.getAccessKey(), configuration.getSecretKey()); - if (isClientConfigFound) { - clientBuilder = clientBuilder.httpClientBuilder(httpClientBuilder) - .credentialsProvider(StaticCredentialsProvider.create(cred)); - } else { - clientBuilder = clientBuilder.credentialsProvider(StaticCredentialsProvider.create(cred)); - } - } else { - if (!isClientConfigFound) { - clientBuilder = clientBuilder.httpClientBuilder(httpClientBuilder); - } - } - if (ObjectHelper.isNotEmpty(configuration.getRegion())) { - clientBuilder = clientBuilder.region(Region.of(configuration.getRegion())); - } - if (configuration.isOverrideEndpoint()) { - clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride())); - } - if (configuration.isTrustAllCertificates()) { - if (httpClientBuilder == null) { - httpClientBuilder = ApacheHttpClient.builder(); - } - SdkHttpClient ahc = httpClientBuilder.buildWithDefaults(AttributeMap - .builder() - .put( - SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, - Boolean.TRUE) - .build()); - // set created http client to use instead of builder - clientBuilder.httpClient(ahc); - clientBuilder.httpClientBuilder(null); - } - client = clientBuilder.build(); - return client; - } -} diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/KinesisFirehoseClientFactoryTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/KinesisFirehoseClientFactoryTest.java index 75872d01c32c..97476556500f 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/KinesisFirehoseClientFactoryTest.java +++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/KinesisFirehoseClientFactoryTest.java @@ -17,48 +17,43 @@ package org.apache.camel.component.aws2.firehose; import org.apache.camel.component.aws2.firehose.client.KinesisFirehoseClientFactory; -import org.apache.camel.component.aws2.firehose.client.KinesisFirehoseInternalClient; -import org.apache.camel.component.aws2.firehose.client.impl.KinesisFirehoseClientIAMOptimizedImpl; -import org.apache.camel.component.aws2.firehose.client.impl.KinesisFirehoseClientSessionTokenImpl; -import org.apache.camel.component.aws2.firehose.client.impl.KinesisFirehoseClientStandardImpl; import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.firehose.FirehoseClient; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertNotNull; public class KinesisFirehoseClientFactoryTest { @Test - public void getStandardFirehoseClientDefault() { - KinesisFirehose2Configuration kinesis2Configuration = new KinesisFirehose2Configuration(); - KinesisFirehoseInternalClient kinesisFirehoseClient - = KinesisFirehoseClientFactory.getKinesisFirehoseClient(kinesis2Configuration); - assertTrue(kinesisFirehoseClient instanceof KinesisFirehoseClientStandardImpl); + public void getFirehoseClientWithDefaultCredentials() { + KinesisFirehose2Configuration configuration = new KinesisFirehose2Configuration(); + configuration.setUseDefaultCredentialsProvider(true); + configuration.setRegion("eu-west-1"); + FirehoseClient firehoseClient = KinesisFirehoseClientFactory.getKinesisFirehoseClient(configuration); + assertNotNull(firehoseClient); + firehoseClient.close(); } @Test - public void getStandardFirehoseClient() { - KinesisFirehose2Configuration kinesis2Configuration = new KinesisFirehose2Configuration(); - kinesis2Configuration.setUseDefaultCredentialsProvider(false); - KinesisFirehoseInternalClient kinesisFirehoseClient - = KinesisFirehoseClientFactory.getKinesisFirehoseClient(kinesis2Configuration); - assertTrue(kinesisFirehoseClient instanceof KinesisFirehoseClientStandardImpl); + public void getFirehoseClientWithStaticCredentials() { + KinesisFirehose2Configuration configuration = new KinesisFirehose2Configuration(); + configuration.setAccessKey("testAccessKey"); + configuration.setSecretKey("testSecretKey"); + configuration.setRegion("eu-west-1"); + FirehoseClient firehoseClient = KinesisFirehoseClientFactory.getKinesisFirehoseClient(configuration); + assertNotNull(firehoseClient); + firehoseClient.close(); } @Test - public void getIAMOptimizedFirehoseClient() { - KinesisFirehose2Configuration kinesis2Configuration = new KinesisFirehose2Configuration(); - kinesis2Configuration.setUseDefaultCredentialsProvider(true); - KinesisFirehoseInternalClient kinesisFirehoseClient - = KinesisFirehoseClientFactory.getKinesisFirehoseClient(kinesis2Configuration); - assertTrue(kinesisFirehoseClient instanceof KinesisFirehoseClientIAMOptimizedImpl); - } - - @Test - public void getSessionTokenFirehoseClient() { - KinesisFirehose2Configuration kinesis2Configuration = new KinesisFirehose2Configuration(); - kinesis2Configuration.setUseSessionCredentials(true); - KinesisFirehoseInternalClient kinesisFirehoseClient - = KinesisFirehoseClientFactory.getKinesisFirehoseClient(kinesis2Configuration); - assertTrue(kinesisFirehoseClient instanceof KinesisFirehoseClientSessionTokenImpl); + public void getFirehoseClientWithEndpointOverride() { + KinesisFirehose2Configuration configuration = new KinesisFirehose2Configuration(); + configuration.setUseDefaultCredentialsProvider(true); + configuration.setRegion("eu-west-1"); + configuration.setOverrideEndpoint(true); + configuration.setUriEndpointOverride("http://localhost:4566"); + FirehoseClient firehoseClient = KinesisFirehoseClientFactory.getKinesisFirehoseClient(configuration); + assertNotNull(firehoseClient); + firehoseClient.close(); } } diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisClientFactoryTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisClientFactoryTest.java index f9ad5b8e5a7f..d77fa7911985 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisClientFactoryTest.java +++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisClientFactoryTest.java @@ -16,60 +16,66 @@ */ package org.apache.camel.component.aws2.kinesis; -import org.apache.camel.component.aws2.kinesis.client.KinesisAsyncInternalClient; import org.apache.camel.component.aws2.kinesis.client.KinesisClientFactory; -import org.apache.camel.component.aws2.kinesis.client.KinesisInternalClient; -import org.apache.camel.component.aws2.kinesis.client.impl.*; import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.KinesisClient; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertNotNull; class KinesisClientFactoryTest { @Test - void getStandardKinesisClientDefault() { + void getKinesisClientWithDefaultCredentials() { Kinesis2Configuration kinesis2Configuration = new Kinesis2Configuration(); - KinesisInternalClient kinesisClient = KinesisClientFactory.getKinesisClient(kinesis2Configuration); - assertTrue(kinesisClient instanceof KinesisClientStandardImpl); + kinesis2Configuration.setUseDefaultCredentialsProvider(true); + kinesis2Configuration.setRegion("eu-west-1"); + KinesisClient kinesisClient = KinesisClientFactory.getKinesisClient(kinesis2Configuration); + assertNotNull(kinesisClient); + kinesisClient.close(); } @Test - void getStandardKinesisClient() { + void getKinesisClientWithStaticCredentials() { Kinesis2Configuration kinesis2Configuration = new Kinesis2Configuration(); - kinesis2Configuration.setUseDefaultCredentialsProvider(false); - KinesisInternalClient kinesisClient = KinesisClientFactory.getKinesisClient(kinesis2Configuration); - assertTrue(kinesisClient instanceof KinesisClientStandardImpl); + kinesis2Configuration.setAccessKey("testAccessKey"); + kinesis2Configuration.setSecretKey("testSecretKey"); + kinesis2Configuration.setRegion("eu-west-1"); + KinesisClient kinesisClient = KinesisClientFactory.getKinesisClient(kinesis2Configuration); + assertNotNull(kinesisClient); + kinesisClient.close(); } @Test - void getIAMOptimizedKinesisClient() { + void getKinesisAsyncClientWithDefaultCredentials() { Kinesis2Configuration kinesis2Configuration = new Kinesis2Configuration(); kinesis2Configuration.setUseDefaultCredentialsProvider(true); - KinesisInternalClient kinesisClient = KinesisClientFactory.getKinesisClient(kinesis2Configuration); - assertTrue(kinesisClient instanceof KinesisClientIAMOptimizedImpl); - } - - @Test - void getSessionTokenKinesisClient() { - Kinesis2Configuration kinesis2Configuration = new Kinesis2Configuration(); - kinesis2Configuration.setUseSessionCredentials(true); - KinesisInternalClient kinesisClient = KinesisClientFactory.getKinesisClient(kinesis2Configuration); - assertTrue(kinesisClient instanceof KinesisClientSessionTokenImpl); + kinesis2Configuration.setRegion("eu-west-1"); + KinesisAsyncClient kinesisAsyncClient = KinesisClientFactory.getKinesisAsyncClient(kinesis2Configuration); + assertNotNull(kinesisAsyncClient); + kinesisAsyncClient.close(); } @Test - void getSessionTokenAsyncKinesisClient() { + void getKinesisAsyncClientWithStaticCredentials() { Kinesis2Configuration kinesis2Configuration = new Kinesis2Configuration(); - kinesis2Configuration.setUseSessionCredentials(true); - KinesisAsyncInternalClient kinesisClient = KinesisClientFactory.getKinesisAsyncClient(kinesis2Configuration); - assertTrue(kinesisClient instanceof KinesisAsyncClientSessionTokenImpl); + kinesis2Configuration.setAccessKey("testAccessKey"); + kinesis2Configuration.setSecretKey("testSecretKey"); + kinesis2Configuration.setRegion("eu-west-1"); + KinesisAsyncClient kinesisAsyncClient = KinesisClientFactory.getKinesisAsyncClient(kinesis2Configuration); + assertNotNull(kinesisAsyncClient); + kinesisAsyncClient.close(); } @Test - void getStandardKinesisAsyncClient() { + void getKinesisClientWithEndpointOverride() { Kinesis2Configuration kinesis2Configuration = new Kinesis2Configuration(); - kinesis2Configuration.setAsyncClient(true); - KinesisAsyncInternalClient kinesisClient = KinesisClientFactory.getKinesisAsyncClient(kinesis2Configuration); - assertTrue(kinesisClient instanceof KinesisAsyncClientStandardImpl); + kinesis2Configuration.setUseDefaultCredentialsProvider(true); + kinesis2Configuration.setRegion("eu-west-1"); + kinesis2Configuration.setOverrideEndpoint(true); + kinesis2Configuration.setUriEndpointOverride("http://localhost:4566"); + KinesisClient kinesisClient = KinesisClientFactory.getKinesisClient(kinesis2Configuration); + assertNotNull(kinesisClient); + kinesisClient.close(); } }
