This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 6c7b664aca NIFI-8287 Upgraded SQS Processors to use AWS SDK 2
6c7b664aca is described below

commit 6c7b664aca4dad8bd35850e97cad77512699dc17
Author: Joe Gresock <[email protected]>
AuthorDate: Mon Apr 24 16:10:47 2023 -0400

    NIFI-8287 Upgraded SQS Processors to use AWS SDK 2
    
    This closes #7211
    
    Signed-off-by: David Handermann <[email protected]>
    (cherry picked from commit 3ef443987991dc619435c3c5c8d135aea91128b4)
---
 .../nifi-aws-abstract-processors/pom.xml           |  26 +-
 .../provider/PropertiesCredentialsProvider.java    |   0
 .../processors/aws/sqs/AbstractSQSProcessor.java   |  32 +-
 .../processors/aws/v2/AbstractAwsProcessor.java    | 440 +++++++++++++++++++++
 .../nifi/processors/aws/v2/AwsClientCache.java     |  37 ++
 .../nifi/processors/aws/v2/AwsClientDetails.java   |  59 +++
 .../nifi/processors/aws/v2/AwsClientProvider.java  |  31 ++
 .../apache/nifi/processors/aws/v2/RegionUtil.java  |  56 +++
 .../apache/nifi/processors/aws/sqs/DeleteSQS.java  |  45 ++-
 .../org/apache/nifi/processors/aws/sqs/GetSQS.java |  79 ++--
 .../org/apache/nifi/processors/aws/sqs/PutSQS.java |  81 ++--
 .../nifi/processors/aws/sqs/ITDeleteSQS.java       |  42 +-
 .../apache/nifi/processors/aws/sqs/ITPutSQS.java   |   6 +-
 .../nifi/processors/aws/sqs/TestDeleteSQS.java     |  30 +-
 .../apache/nifi/processors/aws/sqs/TestGetSQS.java |  92 +++--
 .../apache/nifi/processors/aws/sqs/TestPutSQS.java |  39 +-
 .../nifi-aws-bundle/nifi-aws-service-api/pom.xml   |   4 +
 17 files changed, 862 insertions(+), 237 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/pom.xml 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/pom.xml
index b5368eddf8..3463caa204 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/pom.xml
@@ -25,6 +25,24 @@
     <artifactId>nifi-aws-abstract-processors</artifactId>
 
     <dependencies>
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>sdk-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>apache-client</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>sqs</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>software.amazon.awssdk</groupId>
+                    <artifactId>netty-nio-client</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
         <dependency>
             <groupId>com.amazonaws</groupId>
             <artifactId>aws-java-sdk-core</artifactId>
@@ -48,14 +66,6 @@
                 </exclusion>
             </exclusions>
         </dependency>
-        <dependency>
-            <groupId>software.amazon.awssdk</groupId>
-            <artifactId>s3</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>software.amazon.awssdk</groupId>
-            <artifactId>apache-client</artifactId>
-        </dependency>
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>jcl-over-slf4j</artifactId>
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/PropertiesCredentialsProvider.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/PropertiesCredentialsProvider.java
similarity index 100%
rename from 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/PropertiesCredentialsProvider.java
rename to 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/PropertiesCredentialsProvider.java
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java
index 2d9841a7f3..3ab6729a11 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java
@@ -20,14 +20,11 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+import org.apache.nifi.processors.aws.v2.AbstractAwsProcessor;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.SqsClientBuilder;
 
-import com.amazonaws.ClientConfiguration;
-import com.amazonaws.auth.AWSCredentials;
-import com.amazonaws.auth.AWSCredentialsProvider;
-import com.amazonaws.services.sqs.AmazonSQSClient;
-
-public abstract class AbstractSQSProcessor extends 
AbstractAWSCredentialsProviderProcessor<AmazonSQSClient> {
+public abstract class AbstractSQSProcessor extends 
AbstractAwsProcessor<SqsClient, SqsClientBuilder> {
 
     public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
             .name("Batch Size")
@@ -45,26 +42,9 @@ public abstract class AbstractSQSProcessor extends 
AbstractAWSCredentialsProvide
             .required(true)
             .build();
 
-    /**
-     * Create client using credentials provider. This is the preferred way for 
creating clients
-     */
-    @Override
-    protected AmazonSQSClient createClient(final ProcessContext context, final 
AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
-        getLogger().info("Creating client using aws credentials provider ");
-
-        return new AmazonSQSClient(credentialsProvider, config);
-    }
-
-    /**
-     * Create client using AWSCredentials
-     *
-     * @deprecated use {@link #createClient(ProcessContext, 
AWSCredentialsProvider, ClientConfiguration)} instead
-     */
     @Override
-    protected AmazonSQSClient createClient(final ProcessContext context, final 
AWSCredentials credentials, final ClientConfiguration config) {
-        getLogger().info("Creating client using aws credentials ");
-
-        return new AmazonSQSClient(credentials, config);
+    protected SqsClientBuilder createClientBuilder(final ProcessContext 
context) {
+        return SqsClient.builder();
     }
 
 }
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AbstractAwsProcessor.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AbstractAwsProcessor.java
new file mode 100644
index 0000000000..ceb05b9094
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AbstractAwsProcessor.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.v2;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.VerifiableProcessor;
+import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.processors.aws.credentials.provider.PropertiesCredentialsProvider;
+import 
org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors;
+import 
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.proxy.ProxySpec;
+import org.apache.nifi.ssl.SSLContextService;
+import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
+import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder;
+import software.amazon.awssdk.core.SdkClient;
+import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
+import software.amazon.awssdk.core.retry.RetryPolicy;
+import software.amazon.awssdk.http.FileStoreTlsKeyManagersProvider;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.TlsKeyManagersProvider;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
+import software.amazon.awssdk.regions.Region;
+
+import javax.net.ssl.TrustManager;
+import java.io.File;
+import java.net.Proxy;
+import java.net.URI;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Base class for aws processors using the AWS v2 SDK.
+ *
+ * @param <T> client type
+ *
+ * @see <a 
href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/auth/credentials/AwsCredentialsProvider.html";>AwsCredentialsProvider</a>
+ */
+public abstract class AbstractAwsProcessor<T extends SdkClient, U extends 
AwsSyncClientBuilder<U, T> & AwsClientBuilder<U, T>>
+        extends AbstractProcessor implements VerifiableProcessor, 
AwsClientProvider<T> {
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles are routed to success relationship")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles are routed to failure relationship")
+            .build();
+
+    private static final Set<Relationship> relationships = 
Collections.unmodifiableSet(
+            new LinkedHashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))
+    );
+
+    public static final PropertyDescriptor CREDENTIALS_FILE = 
CredentialPropertyDescriptors.CREDENTIALS_FILE;
+
+    public static final PropertyDescriptor ACCESS_KEY = 
CredentialPropertyDescriptors.ACCESS_KEY;
+
+    public static final PropertyDescriptor SECRET_KEY = 
CredentialPropertyDescriptors.SECRET_KEY;
+
+    public static final PropertyDescriptor PROXY_HOST = new 
PropertyDescriptor.Builder()
+            .name("Proxy Host")
+            .description("Proxy host name or IP")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PROXY_HOST_PORT = new 
PropertyDescriptor.Builder()
+            .name("Proxy Host Port")
+            .description("Proxy host port")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(false)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PROXY_USERNAME = new 
PropertyDescriptor.Builder()
+            .name("proxy-user-name")
+            .displayName("Proxy Username")
+            .description("Proxy username")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PROXY_PASSWORD = new 
PropertyDescriptor.Builder()
+            .name("proxy-user-password")
+            .displayName("Proxy Password")
+            .description("Proxy password")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor REGION = new 
PropertyDescriptor.Builder()
+            .name("Region")
+            .required(true)
+            .allowableValues(RegionUtil.getAvailableRegions())
+            
.defaultValue(RegionUtil.createAllowableValue(Region.US_WEST_2).getValue())
+            .build();
+
+    public static final PropertyDescriptor TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name("Communications Timeout")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("30 secs")
+            .build();
+
+    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("SSL Context Service")
+            .description("Specifies an optional SSL Context Service that, if 
provided, will be used to create connections")
+            .required(false)
+            .identifiesControllerService(SSLContextService.class)
+            .build();
+
+    public static final PropertyDescriptor ENDPOINT_OVERRIDE = new 
PropertyDescriptor.Builder()
+            .name("Endpoint Override URL")
+            .description("Endpoint URL to use instead of the AWS default 
including scheme, host, port, and path. " +
+                    "The AWS libraries select an endpoint URL based on the AWS 
region, but this property overrides " +
+                    "the selected endpoint URL, allowing use with other 
S3-compatible endpoints.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(false)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .build();
+
+    /**
+     * AWS credentials provider service
+     *
+     * @see  <a 
href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html";>AWSCredentialsProvider</a>
+     * @see  <a 
href="https://sdk.amazonaws.com/java/api/2.0.0/software/amazon/awssdk/auth/credentials/AwsCredentialsProvider.html";>AwsCredentialsProvider</a>
+     */
+    public static final PropertyDescriptor AWS_CREDENTIALS_PROVIDER_SERVICE = 
new PropertyDescriptor.Builder()
+            .name("AWS Credentials Provider service")
+            .displayName("AWS Credentials Provider Service")
+            .description("The Controller Service that is used to obtain AWS 
credentials provider")
+            .required(false)
+            .identifiesControllerService(AWSCredentialsProviderService.class)
+            .build();
+
+    protected static final String DEFAULT_USER_AGENT = "NiFi";
+
+    private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH};
+
+    public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE = 
ProxyConfiguration.createProxyConfigPropertyDescriptor(true, PROXY_SPECS);
+
+    protected volatile T client;
+
+    protected volatile Region region;
+
+    private final AwsClientCache<T> awsClientCache = new AwsClientCache<>();
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
+        final List<ValidationResult> validationResults = new 
ArrayList<>(super.customValidate(validationContext));
+
+        final boolean accessKeySet = 
validationContext.getProperty(ACCESS_KEY).isSet();
+        final boolean secretKeySet = 
validationContext.getProperty(SECRET_KEY).isSet();
+        if ((accessKeySet && !secretKeySet) || (secretKeySet && 
!accessKeySet)) {
+            validationResults.add(new ValidationResult.Builder().input("Access 
Key").valid(false).explanation("If setting Secret Key or Access Key, must set 
both").build());
+        }
+
+        final boolean credentialsFileSet = 
validationContext.getProperty(CREDENTIALS_FILE).isSet();
+        if ((secretKeySet || accessKeySet) && credentialsFileSet) {
+            validationResults.add(new ValidationResult.Builder().input("Access 
Key").valid(false).explanation("Cannot set both Credentials File and Secret 
Key/Access Key").build());
+        }
+
+        final boolean proxyHostSet = 
validationContext.getProperty(PROXY_HOST).isSet();
+        final boolean proxyPortSet = 
validationContext.getProperty(PROXY_HOST_PORT).isSet();
+        final boolean proxyConfigServiceSet = 
validationContext.getProperty(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE).isSet();
+
+        if ((proxyHostSet && !proxyPortSet) || (!proxyHostSet && 
proxyPortSet)) {
+            validationResults.add(new 
ValidationResult.Builder().subject("Proxy Host and 
Port").valid(false).explanation("If Proxy Host or Proxy Port is set, both must 
be set").build());
+        }
+
+        final boolean proxyUserSet = 
validationContext.getProperty(PROXY_USERNAME).isSet();
+        final boolean proxyPwdSet = 
validationContext.getProperty(PROXY_PASSWORD).isSet();
+
+        if ((proxyUserSet && !proxyPwdSet) || (!proxyUserSet && proxyPwdSet)) {
+            validationResults.add(new 
ValidationResult.Builder().subject("Proxy User and 
Password").valid(false).explanation("If Proxy Username or Proxy Password is 
set, both must be set").build());
+        }
+
+        if (proxyUserSet && !proxyHostSet) {
+            validationResults.add(new 
ValidationResult.Builder().subject("Proxy").valid(false).explanation("If Proxy 
Username or Proxy Password").build());
+        }
+
+        ProxyConfiguration.validateProxySpec(validationContext, 
validationResults, PROXY_SPECS);
+
+        if (proxyHostSet && proxyConfigServiceSet) {
+            validationResults.add(new 
ValidationResult.Builder().subject("Proxy Configuration Service").valid(false)
+                    .explanation("Either Proxy Username and Proxy Password 
must be set or Proxy Configuration Service but not both").build());
+        }
+
+        return validationResults;
+    }
+
+    @OnShutdown
+    public void onShutDown() {
+        if (this.client != null) {
+            this.client.close();
+        }
+    }
+
+    @OnStopped
+    public void onStopped() {
+        this.awsClientCache.clearCache();
+    }
+
+    @Override
+    public List<ConfigVerificationResult> verify(final ProcessContext context, 
final ComponentLog verificationLogger, final Map<String, String> attributes) {
+        final List<ConfigVerificationResult> results = new ArrayList<>();
+
+        try {
+            createClient(context);
+            results.add(new ConfigVerificationResult.Builder()
+                    .outcome(Outcome.SUCCESSFUL)
+                    .verificationStepName("Create Client")
+                    .explanation("Successfully created AWS Client")
+                    .build());
+        } catch (final Exception e) {
+            verificationLogger.error("Failed to create AWS Client", e);
+            results.add(new ConfigVerificationResult.Builder()
+                    .outcome(Outcome.FAILED)
+                    .verificationStepName("Create Client")
+                    .explanation("Failed to crete AWS Client: " + 
e.getMessage())
+                    .build());
+        }
+
+        return results;
+    }
+
+    /**
+     * Creates the AWS SDK client.
+     * @param context The process context
+     * @return The created client
+     */
+    @Override
+    public T createClient(final ProcessContext context) {
+        final U clientBuilder = createClientBuilder(context);
+        this.configureClientBuilder(clientBuilder, context);
+        return clientBuilder.build();
+    }
+
+    protected void configureClientBuilder(final U clientBuilder, final 
ProcessContext context) {
+        clientBuilder.overrideConfiguration(builder -> 
builder.putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_PREFIX, 
DEFAULT_USER_AGENT));
+        clientBuilder.overrideConfiguration(builder -> 
builder.retryPolicy(RetryPolicy.none()));
+        clientBuilder.httpClient(createSdkHttpClient(context));
+
+        final Region region = getRegion(context);
+        if (region != null) {
+            clientBuilder.region(region);
+        }
+        configureEndpoint(context, clientBuilder);
+
+        final AwsCredentialsProvider credentialsProvider = 
getCredentialsProvider(context);
+        clientBuilder.credentialsProvider(credentialsProvider);
+    }
+
+    /**
+     * Creates an AWS service client from the context or returns an existing 
client from the cache
+     * @param context The process context
+     * @param  awsClientDetails details of the AWS client
+     * @return The created client
+     */
+    protected T getClient(final ProcessContext context, final AwsClientDetails 
awsClientDetails) {
+        return this.awsClientCache.getOrCreateClient(context, 
awsClientDetails, this);
+    }
+
+    protected T getClient(final ProcessContext context) {
+        final AwsClientDetails awsClientDetails = new 
AwsClientDetails(getRegion(context));
+        return getClient(context, awsClientDetails);
+    }
+
+    /**
+     * Construct the AWS SDK client builder and perform any service-specific 
configuration of the builder.
+     * @param context The process context
+     * @return The SDK client builder
+     */
+    protected abstract U createClientBuilder(final ProcessContext context);
+
+    protected Region getRegion(final ProcessContext context) {
+        final Region region;
+        // if the processor supports REGION, get the configured region.
+        if (getSupportedPropertyDescriptors().contains(REGION)) {
+            final String regionValue = context.getProperty(REGION).getValue();
+            if (regionValue != null) {
+                region = Region.of(regionValue);
+            } else {
+                region = null;
+            }
+        } else {
+            region = null;
+        }
+        return region;
+    }
+
+    protected void configureEndpoint(final ProcessContext context, final U 
clientBuilder) {
+        // if the endpoint override has been configured, set the endpoint.
+        // (per Amazon docs this should only be configured at client creation)
+        if (getSupportedPropertyDescriptors().contains(ENDPOINT_OVERRIDE)) {
+            final String endpointOverride = 
StringUtils.trimToEmpty(context.getProperty(ENDPOINT_OVERRIDE).evaluateAttributeExpressions().getValue());
+
+            if (!endpointOverride.isEmpty()) {
+                getLogger().info("Overriding endpoint with {}", 
endpointOverride);
+
+                clientBuilder.endpointOverride(URI.create(endpointOverride));
+            }
+        }
+    }
+
+    /**
+     * Get credentials provider using the {@link AwsCredentialsProvider}
+     * @param context the process context
+     * @return AwsCredentialsProvider the credential provider
+     */
+    protected AwsCredentialsProvider getCredentialsProvider(final 
ProcessContext context) {
+        final AWSCredentialsProviderService awsCredentialsProviderService =
+              
context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE).asControllerService(AWSCredentialsProviderService.class);
+
+        return awsCredentialsProviderService != null ? 
awsCredentialsProviderService.getAwsCredentialsProvider() : 
createStaticCredentialsProvider(context);
+
+    }
+
+    protected AwsCredentialsProvider createStaticCredentialsProvider(final 
PropertyContext context) {
+        final String accessKey = 
context.getProperty(ACCESS_KEY).evaluateAttributeExpressions().getValue();
+        final String secretKey = 
context.getProperty(SECRET_KEY).evaluateAttributeExpressions().getValue();
+
+        final String credentialsFile = 
context.getProperty(CREDENTIALS_FILE).getValue();
+
+        if (credentialsFile != null) {
+            return new PropertiesCredentialsProvider(new 
File(credentialsFile));
+        }
+
+        if (accessKey != null && secretKey != null) {
+            return 
StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, 
secretKey));
+        }
+
+        return AnonymousCredentialsProvider.create();
+    }
+
+    private SdkHttpClient createSdkHttpClient(final ProcessContext context) {
+        final ApacheHttpClient.Builder builder = ApacheHttpClient.builder();
+
+        final int communicationsTimeout = 
context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+        builder.connectionTimeout(Duration.ofMillis(communicationsTimeout));
+        builder.socketTimeout(Duration.ofMillis(communicationsTimeout));
+        builder.maxConnections(context.getMaxConcurrentTasks());
+
+        if 
(this.getSupportedPropertyDescriptors().contains(SSL_CONTEXT_SERVICE)) {
+            final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+            if (sslContextService != null) {
+                final TrustManager[] trustManagers = new TrustManager[] { 
sslContextService.createTrustManager() };
+                final TlsKeyManagersProvider keyManagersProvider = 
FileStoreTlsKeyManagersProvider
+                        .create(Path.of(sslContextService.getKeyStoreFile()), 
sslContextService.getKeyStoreType(), sslContextService.getKeyStorePassword());
+                builder.tlsTrustManagersProvider(() -> trustManagers);
+                builder.tlsKeyManagersProvider(keyManagersProvider);
+            }
+        }
+
+        final ProxyConfiguration proxyConfig = 
ProxyConfiguration.getConfiguration(context, () -> {
+            if (context.getProperty(PROXY_HOST).isSet()) {
+                final ProxyConfiguration componentProxyConfig = new 
ProxyConfiguration();
+                final String proxyHost = 
context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue();
+                final Integer proxyPort = 
context.getProperty(PROXY_HOST_PORT).evaluateAttributeExpressions().asInteger();
+                final String proxyUsername = 
context.getProperty(PROXY_USERNAME).evaluateAttributeExpressions().getValue();
+                final String proxyPassword = 
context.getProperty(PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
+                componentProxyConfig.setProxyType(Proxy.Type.HTTP);
+                componentProxyConfig.setProxyServerHost(proxyHost);
+                componentProxyConfig.setProxyServerPort(proxyPort);
+                componentProxyConfig.setProxyUserName(proxyUsername);
+                componentProxyConfig.setProxyUserPassword(proxyPassword);
+                return componentProxyConfig;
+            } else if 
(context.getProperty(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE).isSet())
 {
+                final ProxyConfigurationService configurationService = 
context.getProperty(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE).asControllerService(ProxyConfigurationService.class);
+                return configurationService.getConfiguration();
+            }
+            return ProxyConfiguration.DIRECT_CONFIGURATION;
+        });
+
+        if (Proxy.Type.HTTP.equals(proxyConfig.getProxyType())) {
+            final 
software.amazon.awssdk.http.apache.ProxyConfiguration.Builder 
proxyConfigBuilder = 
software.amazon.awssdk.http.apache.ProxyConfiguration.builder()
+                    .endpoint(URI.create(String.format("%s:%s", 
proxyConfig.getProxyServerHost(), proxyConfig.getProxyServerPort())));
+
+            if (proxyConfig.hasCredential()) {
+                proxyConfigBuilder.username(proxyConfig.getProxyUserName());
+                
proxyConfigBuilder.password(proxyConfig.getProxyUserPassword());
+            }
+            builder.proxyConfiguration(proxyConfigBuilder.build());
+        }
+
+        return builder.build();
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AwsClientCache.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AwsClientCache.java
new file mode 100644
index 0000000000..4d98583f55
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AwsClientCache.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.v2;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.nifi.processor.ProcessContext;
+import software.amazon.awssdk.core.SdkClient;
+
+public class AwsClientCache<T extends SdkClient> {
+
+    private final Cache<AwsClientDetails, T> clientCache = 
Caffeine.newBuilder().build();
+
+    public T getOrCreateClient(final ProcessContext context, final 
AwsClientDetails clientDetails, final AwsClientProvider<T> provider) {
+        return clientCache.get(clientDetails, ignored -> 
provider.createClient(context));
+    }
+
+    public void clearCache() {
+        clientCache.invalidateAll();
+        clientCache.cleanUp();
+    }
+
+}
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AwsClientDetails.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AwsClientDetails.java
new file mode 100644
index 0000000000..b8148d6093
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AwsClientDetails.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.v2;
+
+import software.amazon.awssdk.regions.Region;
+
+import java.util.Objects;
+
+/**
+ * This class contains the AWS client details used to distinguish between the 
various AWS clients stored in the cache.
+ * The class acts as a cache key for @link AwsClientCache.
+ * AwsClientDetails contains the region only, since actually the region value 
may come from the FlowFile attributes.
+ */
+public class AwsClientDetails {
+    private Region region;
+
+    public AwsClientDetails(Region region) {
+       this.region =  region;
+    }
+
+    public Region getRegion() {
+        return region;
+    }
+
+    public void setRegion(final Region region) {
+        this.region = region;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final AwsClientDetails that = (AwsClientDetails) o;
+        return Objects.equals(region, that.region);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(region);
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AwsClientProvider.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AwsClientProvider.java
new file mode 100644
index 0000000000..bea56727f8
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AwsClientProvider.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.v2;
+
+import org.apache.nifi.processor.ProcessContext;
+import software.amazon.awssdk.core.SdkClient;
+
+public interface AwsClientProvider<T extends SdkClient> {
+
+    /**
+     * Creates an AWS client using process context and AWS client details.
+     *
+     * @param context process context
+     * @return AWS client
+     */
+    T createClient(final ProcessContext context);
+}
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/RegionUtil.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/RegionUtil.java
new file mode 100644
index 0000000000..45814f5c93
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/RegionUtil.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.v2;
+
+import org.apache.nifi.components.AllowableValue;
+import software.amazon.awssdk.regions.Region;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * Utility class for AWS region methods.
+ *
+ */
+public abstract class RegionUtil {
+
+    /**
+     * Creates an AllowableValue from a Region.
+     * @param region An AWS region
+     * @return An AllowableValue for the region
+     */
+    public static AllowableValue createAllowableValue(final Region region) {
+        final String description = region.metadata() != null ? 
region.metadata().description() : region.id();
+        return new AllowableValue(region.id(), description, "AWS Region Code : 
" + region.id());
+    }
+
+    /**
+     *
+     * @return All available regions as AllowableValues.
+     */
+    public static AllowableValue[] getAvailableRegions() {
+        final List<AllowableValue> values = new ArrayList<>();
+        for (final Region region : Region.regions()) {
+            values.add(createAllowableValue(region));
+        }
+        Collections.sort(values, 
Comparator.comparing(AllowableValue::getDisplayName));
+        return values.toArray(new AllowableValue[0]);
+    }
+
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
index 6bc858cfb8..e5e3cc87b3 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
@@ -16,11 +16,6 @@
  */
 package org.apache.nifi.processors.aws.sqs;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
@@ -35,11 +30,14 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
+import 
software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
+import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;
 
-import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 
 @SupportsBatching
 @SeeAlso({GetSQS.class, PutSQS.class})
@@ -75,25 +73,26 @@ public class DeleteSQS extends AbstractSQSProcessor {
 
         final String queueUrl = 
context.getProperty(QUEUE_URL).evaluateAttributeExpressions(flowFile).getValue();
 
-        final AmazonSQSClient client = getClient(context);
-        final DeleteMessageBatchRequest request = new 
DeleteMessageBatchRequest();
-        request.setQueueUrl(queueUrl);
+        final SqsClient client = getClient(context);
+
+        final String receiptHandle = 
context.getProperty(RECEIPT_HANDLE).evaluateAttributeExpressions(flowFile).getValue();
+        final String entryId = 
flowFile.getAttribute(CoreAttributes.UUID.key());
+        final DeleteMessageBatchRequestEntry entry = 
DeleteMessageBatchRequestEntry.builder()
+                .receiptHandle(receiptHandle)
+                .id(entryId)
+                .build();
 
-        final List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
-        final DeleteMessageBatchRequestEntry entry = new 
DeleteMessageBatchRequestEntry();
-        String receiptHandle = 
context.getProperty(RECEIPT_HANDLE).evaluateAttributeExpressions(flowFile).getValue();
-        entry.setReceiptHandle(receiptHandle);
-        String entryId = flowFile.getAttribute(CoreAttributes.UUID.key());
-        entry.setId(entryId);
-        entries.add(entry);
-        request.setEntries(entries);
+        final DeleteMessageBatchRequest request = 
DeleteMessageBatchRequest.builder()
+                .queueUrl(queueUrl)
+                .entries(entry)
+                .build();
 
         try {
-            DeleteMessageBatchResult response = 
client.deleteMessageBatch(request);
+            DeleteMessageBatchResponse response = 
client.deleteMessageBatch(request);
 
             // check for errors
-            if (!response.getFailed().isEmpty()) {
-                throw new 
ProcessException(response.getFailed().get(0).toString());
+            if (!response.failed().isEmpty()) {
+                throw new 
ProcessException(response.failed().get(0).toString());
             }
 
             getLogger().info("Successfully deleted message from SQS for {}", 
new Object[] { flowFile });
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
index 1ffb68d023..e6f197829b 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
@@ -16,13 +16,7 @@
  */
 package org.apache.nifi.processors.aws.sqs;
 
-import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
-import com.amazonaws.services.sqs.model.Message;
-import com.amazonaws.services.sqs.model.MessageAttributeValue;
-import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
-import com.amazonaws.services.sqs.model.ReceiveMessageResult;
+
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
@@ -38,11 +32,17 @@ import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
+import 
software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
+import software.amazon.awssdk.services.sqs.model.Message;
+import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
+import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
+import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
+import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
+import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
 
-import java.io.IOException;
-import java.io.OutputStream;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -136,28 +136,29 @@ public class GetSQS extends AbstractSQSProcessor {
         final String queueUrl = 
context.getProperty(DYNAMIC_QUEUE_URL).evaluateAttributeExpressions()
                 .getValue();
 
-        final AmazonSQSClient client = getClient(context);
+        final SqsClient client = getClient(context);
 
-        final ReceiveMessageRequest request = new ReceiveMessageRequest();
-        request.setAttributeNames(Collections.singleton("All"));
-        request.setMessageAttributeNames(Collections.singleton("All"));
-        
request.setMaxNumberOfMessages(context.getProperty(BATCH_SIZE).asInteger());
-        
request.setVisibilityTimeout(context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue());
-        request.setQueueUrl(queueUrl);
-        
request.setWaitTimeSeconds(context.getProperty(RECEIVE_MSG_WAIT_TIME).asTimePeriod(TimeUnit.SECONDS).intValue());
+        final ReceiveMessageRequest request = ReceiveMessageRequest.builder()
+                .attributeNames(QueueAttributeName.ALL)
+                .messageAttributeNames("All")
+                
.maxNumberOfMessages(context.getProperty(BATCH_SIZE).asInteger())
+                
.visibilityTimeout(context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue())
+                .queueUrl(queueUrl)
+                
.waitTimeSeconds(context.getProperty(RECEIVE_MSG_WAIT_TIME).asTimePeriod(TimeUnit.SECONDS).intValue())
+                .build();
 
         final Charset charset = 
Charset.forName(context.getProperty(CHARSET).getValue());
 
-        final ReceiveMessageResult result;
+        final ReceiveMessageResponse response;
         try {
-            result = client.receiveMessage(request);
+            response = client.receiveMessage(request);
         } catch (final Exception e) {
             getLogger().error("Failed to receive messages from Amazon SQS due 
to {}", new Object[]{e});
             context.yield();
             return;
         }
 
-        final List<Message> messages = result.getMessages();
+        final List<Message> messages = response.messages();
         if (messages.isEmpty()) {
             context.yield();
             return;
@@ -169,26 +170,21 @@ public class GetSQS extends AbstractSQSProcessor {
             FlowFile flowFile = session.create();
 
             final Map<String, String> attributes = new HashMap<>();
-            for (final Map.Entry<String, String> entry : 
message.getAttributes().entrySet()) {
+            for (final Map.Entry<MessageSystemAttributeName, String> entry : 
message.attributes().entrySet()) {
                 attributes.put("sqs." + entry.getKey(), entry.getValue());
             }
 
-            for (final Map.Entry<String, MessageAttributeValue> entry : 
message.getMessageAttributes().entrySet()) {
-                attributes.put("sqs." + entry.getKey(), 
entry.getValue().getStringValue());
+            for (final Map.Entry<String, MessageAttributeValue> entry : 
message.messageAttributes().entrySet()) {
+                attributes.put("sqs." + entry.getKey(), 
entry.getValue().stringValue());
             }
 
-            attributes.put("hash.value", message.getMD5OfBody());
+            attributes.put("hash.value", message.md5OfBody());
             attributes.put("hash.algorithm", "md5");
-            attributes.put("sqs.message.id", message.getMessageId());
-            attributes.put("sqs.receipt.handle", message.getReceiptHandle());
+            attributes.put("sqs.message.id", message.messageId());
+            attributes.put("sqs.receipt.handle", message.receiptHandle());
 
             flowFile = session.putAllAttributes(flowFile, attributes);
-            flowFile = session.write(flowFile, new OutputStreamCallback() {
-                @Override
-                public void process(final OutputStream out) throws IOException 
{
-                    out.write(message.getBody().getBytes(charset));
-                }
-            });
+            flowFile = session.write(flowFile, out -> 
out.write(message.body().getBytes(charset)));
 
             session.transfer(flowFile, REL_SUCCESS);
             session.getProvenanceReporter().receive(flowFile, queueUrl);
@@ -203,18 +199,19 @@ public class GetSQS extends AbstractSQSProcessor {
         }
     }
 
-    private void deleteMessages(final AmazonSQSClient client, final String 
queueUrl, final List<Message> messages) {
-        final DeleteMessageBatchRequest deleteRequest = new 
DeleteMessageBatchRequest();
-        deleteRequest.setQueueUrl(queueUrl);
+    private void deleteMessages(final SqsClient client, final String queueUrl, 
final List<Message> messages) {
         final List<DeleteMessageBatchRequestEntry> deleteRequestEntries = new 
ArrayList<>();
         for (final Message message : messages) {
-            final DeleteMessageBatchRequestEntry entry = new 
DeleteMessageBatchRequestEntry();
-            entry.setId(message.getMessageId());
-            entry.setReceiptHandle(message.getReceiptHandle());
+            final DeleteMessageBatchRequestEntry entry = 
DeleteMessageBatchRequestEntry.builder()
+                    .id(message.messageId())
+                    .receiptHandle(message.receiptHandle())
+                    .build();
             deleteRequestEntries.add(entry);
         }
-
-        deleteRequest.setEntries(deleteRequestEntries);
+        final DeleteMessageBatchRequest deleteRequest = 
DeleteMessageBatchRequest.builder()
+                .queueUrl(queueUrl)
+                .entries(deleteRequestEntries)
+                .build();
 
         try {
             client.deleteMessageBatch(deleteRequest);
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
index a72d1b222b..6667673cbd 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
@@ -16,17 +16,6 @@
  */
 package org.apache.nifi.processors.aws.sqs;
 
-import java.io.ByteArrayOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -38,16 +27,25 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
 
-import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.amazonaws.services.sqs.model.MessageAttributeValue;
-import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
-import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
-import com.amazonaws.services.sqs.model.SendMessageBatchResult;
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 @SupportsBatching
 @SeeAlso({ GetSQS.class, DeleteSQS.class })
@@ -58,6 +56,7 @@ import 
com.amazonaws.services.sqs.model.SendMessageBatchResult;
         description = "Allows the user to add key/value pairs as Message 
Attributes by adding a property whose name will become the name of "
         + "the Message Attribute and value will become the value of the 
Message Attribute", expressionLanguageScope = 
ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
 public class PutSQS extends AbstractSQSProcessor {
+    private static final String STRING_DATA_TYPE = "String";
 
     public static final PropertyDescriptor DELAY = new 
PropertyDescriptor.Builder()
             .name("Delay")
@@ -127,53 +126,43 @@ public class PutSQS extends AbstractSQSProcessor {
         }
 
         final long startNanos = System.nanoTime();
-        final AmazonSQSClient client = getClient(context);
-        final SendMessageBatchRequest request = new SendMessageBatchRequest();
+        final SqsClient client = getClient(context);
         final String queueUrl = 
context.getProperty(QUEUE_URL).evaluateAttributeExpressions(flowFile).getValue();
-        request.setQueueUrl(queueUrl);
-
-        final Set<SendMessageBatchRequestEntry> entries = new HashSet<>();
 
-        final SendMessageBatchRequestEntry entry = new 
SendMessageBatchRequestEntry();
         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
         session.exportTo(flowFile, baos);
         final String flowFileContent = baos.toString();
-        entry.setMessageBody(flowFileContent);
-        entry.setId(flowFile.getAttribute("uuid"));
-
-        if (context.getProperty(MESSAGEGROUPID).isSet()) {
-            entry.setMessageGroupId(context.getProperty(MESSAGEGROUPID)
-                    .evaluateAttributeExpressions(flowFile)
-                    .getValue());
-        }
-
-        if (context.getProperty(MESSAGEDEDUPLICATIONID).isSet()) {
-            
entry.setMessageDeduplicationId(context.getProperty(MESSAGEDEDUPLICATIONID)
-                    .evaluateAttributeExpressions(flowFile)
-                    .getValue());
-        }
 
         final Map<String, MessageAttributeValue> messageAttributes = new 
HashMap<>();
 
         for (final PropertyDescriptor descriptor : userDefinedProperties) {
-            final MessageAttributeValue mav = new MessageAttributeValue();
-            mav.setDataType("String");
-            
mav.setStringValue(context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue());
+            final MessageAttributeValue mav = MessageAttributeValue.builder()
+                    .dataType(STRING_DATA_TYPE)
+                    
.stringValue(context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue())
+                    .build();
             messageAttributes.put(descriptor.getName(), mav);
         }
 
-        entry.setMessageAttributes(messageAttributes);
-        
entry.setDelaySeconds(context.getProperty(DELAY).asTimePeriod(TimeUnit.SECONDS).intValue());
-        entries.add(entry);
+        final SendMessageBatchRequestEntry entry = 
SendMessageBatchRequestEntry.builder()
+                .messageBody(flowFileContent)
+                
.messageGroupId(context.getProperty(MESSAGEGROUPID).evaluateAttributeExpressions(flowFile).getValue())
+                
.messageDeduplicationId(context.getProperty(MESSAGEDEDUPLICATIONID).evaluateAttributeExpressions(flowFile).getValue())
+                .id(flowFile.getAttribute(CoreAttributes.UUID.key()))
+                .messageAttributes(messageAttributes)
+                
.delaySeconds(context.getProperty(DELAY).asTimePeriod(TimeUnit.SECONDS).intValue())
+                .build();
 
-        request.setEntries(entries);
+        final SendMessageBatchRequest request = 
SendMessageBatchRequest.builder()
+                .queueUrl(queueUrl)
+                .entries(entry)
+                .build();
 
         try {
-            SendMessageBatchResult response = client.sendMessageBatch(request);
+            SendMessageBatchResponse response = 
client.sendMessageBatch(request);
 
             // check for errors
-            if (!response.getFailed().isEmpty()) {
-                throw new 
ProcessException(response.getFailed().get(0).toString());
+            if (!response.failed().isEmpty()) {
+                throw new 
ProcessException(response.failed().get(0).toString());
             }
         } catch (final Exception e) {
             getLogger().error("Failed to send messages to Amazon SQS due to 
{}; routing to failure", new Object[]{e});
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/ITDeleteSQS.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/ITDeleteSQS.java
index 301222488c..c8752e9eb1 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/ITDeleteSQS.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/ITDeleteSQS.java
@@ -16,16 +16,18 @@
  */
 package org.apache.nifi.processors.aws.sqs;
 
-import com.amazonaws.auth.PropertiesCredentials;
-import com.amazonaws.regions.Regions;
-import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.amazonaws.services.sqs.model.Message;
-import com.amazonaws.services.sqs.model.ReceiveMessageResult;
-import com.amazonaws.services.sqs.model.SendMessageResult;
+import 
org.apache.nifi.processors.aws.credentials.provider.PropertiesCredentialsProvider;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.Message;
+import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
+import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageResponse;
 
 import java.io.File;
 import java.io.IOException;
@@ -39,26 +41,34 @@ public class ITDeleteSQS {
     private final String CREDENTIALS_FILE = System.getProperty("user.home") + 
"/aws-credentials.properties";
     private final String TEST_QUEUE_URL = 
"https://sqs.us-west-2.amazonaws.com/123456789012/nifi-test-queue";;
     private final String TEST_REGION = "us-west-2";
-    AmazonSQSClient sqsClient = null;
+    SqsClient sqsClient = null;
 
     @BeforeEach
     public void setUp() throws IOException {
-        PropertiesCredentials credentials = new PropertiesCredentials(new 
File(CREDENTIALS_FILE));
-        sqsClient = new AmazonSQSClient(credentials);
-        sqsClient.withRegion(Regions.fromName(TEST_REGION));
+        sqsClient = SqsClient.builder()
+                .region(Region.of(TEST_REGION))
+                .credentialsProvider(() -> new 
PropertiesCredentialsProvider(new File(CREDENTIALS_FILE)).resolveCredentials())
+                .build();
     }
 
     @Test
     public void testSimpleDelete() {
         // Setup - put one message in queue
-        SendMessageResult sendMessageResult = 
sqsClient.sendMessage(TEST_QUEUE_URL, "Test message");
-        assertEquals(200, 
sendMessageResult.getSdkHttpMetadata().getHttpStatusCode());
+        final SendMessageRequest sendMessageRequest = 
SendMessageRequest.builder()
+                .queueUrl(TEST_QUEUE_URL)
+                .messageBody("Test message")
+                .build();
+        SendMessageResponse sendMessageResult = 
sqsClient.sendMessage(sendMessageRequest);
+        assertEquals(200, sendMessageResult.sdkHttpResponse().statusCode());
 
         // Setup - receive message to get receipt handle
-        ReceiveMessageResult receiveMessageResult = 
sqsClient.receiveMessage(TEST_QUEUE_URL);
-        assertEquals(200, 
receiveMessageResult.getSdkHttpMetadata().getHttpStatusCode());
-        Message deleteMessage = receiveMessageResult.getMessages().get(0);
-        String receiptHandle = deleteMessage.getReceiptHandle();
+        final ReceiveMessageRequest receiveMessageRequest = 
ReceiveMessageRequest.builder()
+                .queueUrl(TEST_QUEUE_URL)
+                .build();
+        ReceiveMessageResponse receiveMessageResult = 
sqsClient.receiveMessage(receiveMessageRequest);
+        assertEquals(200, receiveMessageResult.sdkHttpResponse().statusCode());
+        Message deleteMessage = receiveMessageResult.messages().get(0);
+        String receiptHandle = deleteMessage.receiptHandle();
 
         // Test - delete message with DeleteSQS
         final TestRunner runner = TestRunners.newTestRunner(new DeleteSQS());
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/ITPutSQS.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/ITPutSQS.java
index 2b54350c43..d63a8605ad 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/ITPutSQS.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/ITPutSQS.java
@@ -16,12 +16,12 @@
  */
 package org.apache.nifi.processors.aws.sqs;
 
-import com.amazonaws.regions.Regions;
 import 
org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors;
 import 
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.regions.Region;
 
 import java.io.IOException;
 import java.nio.file.Paths;
@@ -34,7 +34,7 @@ public class ITPutSQS {
 
     private final String CREDENTIALS_FILE = System.getProperty("user.home") + 
"/aws-credentials.properties";
     private final String QUEUE_URL = 
"https://sqs.us-west-2.amazonaws.com/100515378163/test-queue-000000000";;
-    private final String REGION = Regions.US_WEST_2.getName();
+    private final String REGION = Region.US_WEST_2.id();
 
     private final String VPCE_QUEUE_URL = 
"https://vpce-1234567890abcdefg-12345678.sqs.us-west-2.vpce.amazonaws.com/123456789012/test-queue";;
     private final String VPCE_ENDPOINT_OVERRIDE = 
"https://vpce-1234567890abcdefg-12345678.sqs.us-west-2.vpce.amazonaws.com";;
@@ -91,7 +91,7 @@ public class ITPutSQS {
 
         final TestRunner runner = TestRunners.newTestRunner(new PutSQS());
         runner.setProperty(PutSQS.CREDENTIALS_FILE, 
System.getProperty("user.home") + "/aws-credentials.properties");
-        runner.setProperty(PutSQS.REGION, Regions.US_WEST_2.getName());
+        runner.setProperty(PutSQS.REGION, Region.US_WEST_2.id());
         runner.setProperty(PutSQS.QUEUE_URL, VPCE_QUEUE_URL);
         runner.setProperty(PutSQS.ENDPOINT_OVERRIDE, VPCE_ENDPOINT_OVERRIDE);
 
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestDeleteSQS.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestDeleteSQS.java
index 744f47876d..0ee3b13da0 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestDeleteSQS.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestDeleteSQS.java
@@ -16,10 +16,6 @@
  */
 package org.apache.nifi.processors.aws.sqs;
 
-import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.amazonaws.services.sqs.model.AmazonSQSException;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -27,8 +23,11 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
+import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;
 
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -39,18 +38,19 @@ public class TestDeleteSQS {
 
     private TestRunner runner = null;
     private DeleteSQS mockDeleteSQS = null;
-    private AmazonSQSClient mockSQSClient = null;
+    private SqsClient mockSQSClient = null;
 
     @BeforeEach
     public void setUp() {
-        mockSQSClient = Mockito.mock(AmazonSQSClient.class);
-        DeleteMessageBatchResult mockResponse = 
Mockito.mock(DeleteMessageBatchResult.class);
-        
Mockito.when(mockSQSClient.deleteMessageBatch(Mockito.any())).thenReturn(mockResponse);
-        Mockito.when(mockResponse.getFailed()).thenReturn(new ArrayList<>());
+        mockSQSClient = Mockito.mock(SqsClient.class);
+        DeleteMessageBatchResponse mockResponse = 
DeleteMessageBatchResponse.builder()
+                .failed(Collections.emptyList())
+                .build();
+        
Mockito.when(mockSQSClient.deleteMessageBatch(Mockito.any(DeleteMessageBatchRequest.class))).thenReturn(mockResponse);
         mockDeleteSQS = new DeleteSQS() {
 
             @Override
-            protected AmazonSQSClient getClient(ProcessContext context) {
+            protected SqsClient getClient(ProcessContext context) {
                 return mockSQSClient;
             }
         };
@@ -71,8 +71,8 @@ public class TestDeleteSQS {
         ArgumentCaptor<DeleteMessageBatchRequest> captureDeleteRequest = 
ArgumentCaptor.forClass(DeleteMessageBatchRequest.class);
         Mockito.verify(mockSQSClient, 
Mockito.times(1)).deleteMessageBatch(captureDeleteRequest.capture());
         DeleteMessageBatchRequest deleteRequest = 
captureDeleteRequest.getValue();
-        
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000";,
 deleteRequest.getQueueUrl());
-        assertEquals("test-receipt-handle-1", 
deleteRequest.getEntries().get(0).getReceiptHandle());
+        
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000";,
 deleteRequest.queueUrl());
+        assertEquals("test-receipt-handle-1", 
deleteRequest.entries().get(0).receiptHandle());
 
         runner.assertAllFlowFilesTransferred(DeleteSQS.REL_SUCCESS, 1);
     }
@@ -92,7 +92,7 @@ public class TestDeleteSQS {
         ArgumentCaptor<DeleteMessageBatchRequest> captureDeleteRequest = 
ArgumentCaptor.forClass(DeleteMessageBatchRequest.class);
         Mockito.verify(mockSQSClient, 
Mockito.times(1)).deleteMessageBatch(captureDeleteRequest.capture());
         DeleteMessageBatchRequest deleteRequest = 
captureDeleteRequest.getValue();
-        assertEquals("test-receipt-handle-1", 
deleteRequest.getEntries().get(0).getReceiptHandle());
+        assertEquals("test-receipt-handle-1", 
deleteRequest.entries().get(0).receiptHandle());
 
         runner.assertAllFlowFilesTransferred(DeleteSQS.REL_SUCCESS, 1);
     }
@@ -105,7 +105,7 @@ public class TestDeleteSQS {
         ff1Attributes.put("sqs.receipt.handle", "test-receipt-handle-1");
         runner.enqueue("TestMessageBody1", ff1Attributes);
         
Mockito.when(mockSQSClient.deleteMessageBatch(Mockito.any(DeleteMessageBatchRequest.class)))
-                .thenThrow(new AmazonSQSException("TestFail"));
+                .thenThrow(new RuntimeException());
 
         runner.assertValid();
         runner.run(1);
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java
index edef28a62c..908caf18b8 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java
@@ -16,12 +16,6 @@
  */
 package org.apache.nifi.processors.aws.sqs;
 
-import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
-import com.amazonaws.services.sqs.model.Message;
-import com.amazonaws.services.sqs.model.MessageAttributeValue;
-import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
-import com.amazonaws.services.sqs.model.ReceiveMessageResult;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
@@ -30,8 +24,17 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
-
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
+import software.amazon.awssdk.services.sqs.model.Message;
+import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
+import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
+import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
+import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
+
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -40,14 +43,14 @@ public class TestGetSQS {
 
     private TestRunner runner = null;
     private GetSQS mockGetSQS = null;
-    private AmazonSQSClient mockSQSClient = null;
+    private SqsClient mockSQSClient = null;
 
     @BeforeEach
     public void setUp() {
-        mockSQSClient = Mockito.mock(AmazonSQSClient.class);
+        mockSQSClient = Mockito.mock(SqsClient.class);
         mockGetSQS = new GetSQS() {
             @Override
-            protected AmazonSQSClient getClient(ProcessContext context) {
+            protected SqsClient getClient(ProcessContext context) {
                 return mockSQSClient;
             }
         };
@@ -59,17 +62,24 @@ public class TestGetSQS {
         runner.setProperty(GetSQS.QUEUE_URL, 
"https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000";);
         runner.setProperty(GetSQS.AUTO_DELETE, "false");
 
-        Message message1 = new Message();
-        message1.setBody("TestMessage1");
-        message1.addAttributesEntry("attrib-key-1", "attrib-value-1");
-        MessageAttributeValue messageAttributeValue = new 
MessageAttributeValue();
-        messageAttributeValue.setStringValue("msg-attrib-value-1");
-        message1.addMessageAttributesEntry("msg-attrib-key-1", 
messageAttributeValue);
-        message1.setMD5OfBody("test-md5-hash-1");
-        message1.setMessageId("test-message-id-1");
-        message1.setReceiptHandle("test-receipt-handle-1");
-        ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult()
-                .withMessages(message1);
+        final Map<String, String> attributes = new HashMap<>();
+        final Map<String, MessageAttributeValue> messageAttributes = new 
HashMap<>();
+        MessageAttributeValue messageAttributeValue = 
MessageAttributeValue.builder()
+                .stringValue("msg-attrib-value-1").build();
+        messageAttributes.put("msg-attrib-key-1", messageAttributeValue);
+        attributes.put("attrib-key-1", "attrib-value-1"); // This attribute is 
no longer valid in SDK v2
+        attributes.put(MessageSystemAttributeName.MESSAGE_GROUP_ID.toString(), 
"attrib-value-1"); // However, this one is allowed
+        Message message1 = Message.builder()
+                .body("TestMessage1")
+                .attributesWithStrings(attributes)
+                .messageAttributes(messageAttributes)
+                .md5OfBody("test-md5-hash-1")
+                .messageId("test-message-id-1")
+                .receiptHandle("test-receipt-handle-1")
+                .build();
+        ReceiveMessageResponse receiveMessageResult = 
ReceiveMessageResponse.builder()
+                .messages(message1)
+                .build();
         
Mockito.when(mockSQSClient.receiveMessage(Mockito.any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult);
 
         runner.run(1);
@@ -77,13 +87,14 @@ public class TestGetSQS {
         ArgumentCaptor<ReceiveMessageRequest> captureRequest = 
ArgumentCaptor.forClass(ReceiveMessageRequest.class);
         Mockito.verify(mockSQSClient, 
Mockito.times(1)).receiveMessage(captureRequest.capture());
         ReceiveMessageRequest request = captureRequest.getValue();
-        
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000";,
 request.getQueueUrl());
+        
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000";,
 request.queueUrl());
         Mockito.verify(mockSQSClient, 
Mockito.never()).deleteMessageBatch(Mockito.any(DeleteMessageBatchRequest.class));
 
         runner.assertAllFlowFilesTransferred(GetSQS.REL_SUCCESS, 1);
         List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS);
         MockFlowFile ff0 = flowFiles.get(0);
-        ff0.assertAttributeEquals("sqs.attrib-key-1", "attrib-value-1");
+        ff0.assertAttributeNotExists("sqs.attrib-key-1");
+        ff0.assertAttributeEquals("sqs.MessageGroupId", "attrib-value-1");
         ff0.assertAttributeEquals("sqs.msg-attrib-key-1", 
"msg-attrib-value-1");
         ff0.assertAttributeEquals("hash.value", "test-md5-hash-1");
         ff0.assertAttributeEquals("hash.algorithm", "md5");
@@ -94,7 +105,7 @@ public class TestGetSQS {
     @Test
     public void testGetNoMessages() {
         runner.setProperty(GetSQS.QUEUE_URL, 
"https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000";);
-        ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult();
+        ReceiveMessageResponse receiveMessageResult = 
ReceiveMessageResponse.builder().build();
         
Mockito.when(mockSQSClient.receiveMessage(Mockito.any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult);
 
         runner.run(1);
@@ -102,7 +113,7 @@ public class TestGetSQS {
         ArgumentCaptor<ReceiveMessageRequest> captureRequest = 
ArgumentCaptor.forClass(ReceiveMessageRequest.class);
         Mockito.verify(mockSQSClient, 
Mockito.times(1)).receiveMessage(captureRequest.capture());
         ReceiveMessageRequest request = captureRequest.getValue();
-        
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000";,
 request.getQueueUrl());
+        
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000";,
 request.queueUrl());
 
         runner.assertAllFlowFilesTransferred(GetSQS.REL_SUCCESS, 0);
     }
@@ -112,16 +123,19 @@ public class TestGetSQS {
         runner.setProperty(GetSQS.QUEUE_URL, 
"https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000";);
         runner.setProperty(GetSQS.AUTO_DELETE, "true");
 
-        Message message1 = new Message();
-        message1.setBody("TestMessage1");
-        message1.setMessageId("test-message-id-1");
-        message1.setReceiptHandle("test-receipt-handle-1");
-        Message message2 = new Message();
-        message2.setBody("TestMessage2");
-        message2.setMessageId("test-message-id-2");
-        message2.setReceiptHandle("test-receipt-handle-2");
-        ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult()
-                .withMessages(message1, message2);
+        Message message1 = Message.builder()
+                .body("TestMessage1")
+                .messageId("test-message-id-1")
+                .receiptHandle("test-receipt-handle-1")
+                .build();
+        Message message2 = Message.builder()
+                .body("TestMessage2")
+                .messageId("test-message-id-2")
+                .receiptHandle("test-receipt-handle-2")
+                .build();
+        ReceiveMessageResponse receiveMessageResult = 
ReceiveMessageResponse.builder()
+                .messages(message1, message2)
+                .build();
         
Mockito.when(mockSQSClient.receiveMessage(Mockito.any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult);
 
         runner.run(1);
@@ -129,14 +143,14 @@ public class TestGetSQS {
         ArgumentCaptor<ReceiveMessageRequest> captureReceiveRequest = 
ArgumentCaptor.forClass(ReceiveMessageRequest.class);
         Mockito.verify(mockSQSClient, 
Mockito.times(1)).receiveMessage(captureReceiveRequest.capture());
         ReceiveMessageRequest receiveRequest = 
captureReceiveRequest.getValue();
-        
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000";,
 receiveRequest.getQueueUrl());
+        
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000";,
 receiveRequest.queueUrl());
 
         ArgumentCaptor<DeleteMessageBatchRequest> captureDeleteRequest = 
ArgumentCaptor.forClass(DeleteMessageBatchRequest.class);
         Mockito.verify(mockSQSClient, 
Mockito.times(1)).deleteMessageBatch(captureDeleteRequest.capture());
         DeleteMessageBatchRequest deleteRequest = 
captureDeleteRequest.getValue();
-        
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000";,
 deleteRequest.getQueueUrl());
-        assertEquals("test-message-id-1", 
deleteRequest.getEntries().get(0).getId());
-        assertEquals("test-message-id-2", 
deleteRequest.getEntries().get(1).getId());
+        
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000";,
 deleteRequest.queueUrl());
+        assertEquals("test-message-id-1", deleteRequest.entries().get(0).id());
+        assertEquals("test-message-id-2", deleteRequest.entries().get(1).id());
 
         runner.assertAllFlowFilesTransferred(GetSQS.REL_SUCCESS, 2);
         List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS);
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java
index 9f23423e1c..882745bc79 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java
@@ -16,10 +16,6 @@
  */
 package org.apache.nifi.processors.aws.sqs;
 
-import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.amazonaws.services.sqs.model.AmazonSQSException;
-import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
-import com.amazonaws.services.sqs.model.SendMessageBatchResult;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -27,6 +23,9 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -39,14 +38,14 @@ public class TestPutSQS {
 
     private TestRunner runner = null;
     private PutSQS mockPutSQS = null;
-    private AmazonSQSClient mockSQSClient = null;
+    private SqsClient mockSQSClient = null;
 
     @BeforeEach
     public void setUp() {
-        mockSQSClient = Mockito.mock(AmazonSQSClient.class);
+        mockSQSClient = Mockito.mock(SqsClient.class);
         mockPutSQS = new PutSQS() {
             @Override
-            protected AmazonSQSClient getClient(ProcessContext context) {
+            protected SqsClient getClient(ProcessContext context) {
                 return mockSQSClient;
             }
         };
@@ -62,7 +61,7 @@ public class TestPutSQS {
         attrs.put("filename", "1.txt");
         runner.enqueue("TestMessageBody", attrs);
 
-        SendMessageBatchResult batchResult = new SendMessageBatchResult();
+        SendMessageBatchResponse batchResult = 
SendMessageBatchResponse.builder().build();
         
Mockito.when(mockSQSClient.sendMessageBatch(Mockito.any(SendMessageBatchRequest.class))).thenReturn(batchResult);
 
         runner.run(1);
@@ -70,9 +69,9 @@ public class TestPutSQS {
         ArgumentCaptor<SendMessageBatchRequest> captureRequest = 
ArgumentCaptor.forClass(SendMessageBatchRequest.class);
         Mockito.verify(mockSQSClient, 
Mockito.times(1)).sendMessageBatch(captureRequest.capture());
         SendMessageBatchRequest request = captureRequest.getValue();
-        
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000";,
 request.getQueueUrl());
-        assertEquals("hello", 
request.getEntries().get(0).getMessageAttributes().get("x-custom-prop").getStringValue());
-        assertEquals("TestMessageBody", 
request.getEntries().get(0).getMessageBody());
+        
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000";,
 request.queueUrl());
+        assertEquals("hello", 
request.entries().get(0).messageAttributes().get("x-custom-prop").stringValue());
+        assertEquals("TestMessageBody", 
request.entries().get(0).messageBody());
 
         runner.assertAllFlowFilesTransferred(PutSQS.REL_SUCCESS, 1);
     }
@@ -85,15 +84,15 @@ public class TestPutSQS {
         attrs.put("filename", "1.txt");
         runner.enqueue("TestMessageBody", attrs);
 
-        
Mockito.when(mockSQSClient.sendMessageBatch(Mockito.any(SendMessageBatchRequest.class))).thenThrow(new
 AmazonSQSException("TestFail"));
+        
Mockito.when(mockSQSClient.sendMessageBatch(Mockito.any(SendMessageBatchRequest.class))).thenThrow(new
 RuntimeException());
 
         runner.run(1);
 
         ArgumentCaptor<SendMessageBatchRequest> captureRequest = 
ArgumentCaptor.forClass(SendMessageBatchRequest.class);
         Mockito.verify(mockSQSClient, 
Mockito.times(1)).sendMessageBatch(captureRequest.capture());
         SendMessageBatchRequest request = captureRequest.getValue();
-        
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000";,
 request.getQueueUrl());
-        assertEquals("TestMessageBody", 
request.getEntries().get(0).getMessageBody());
+        
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000";,
 request.queueUrl());
+        assertEquals("TestMessageBody", 
request.entries().get(0).messageBody());
 
         runner.assertAllFlowFilesTransferred(PutSQS.REL_FAILURE, 1);
     }
@@ -110,7 +109,7 @@ public class TestPutSQS {
         attrs.put("myuuid", "fb0dfed8-092e-40ee-83ce-5b576cd26236");
         runner.enqueue("TestMessageBody", attrs);
 
-        SendMessageBatchResult batchResult = new SendMessageBatchResult();
+        final SendMessageBatchResponse batchResult = 
SendMessageBatchResponse.builder().build();
         
Mockito.when(mockSQSClient.sendMessageBatch(Mockito.any(SendMessageBatchRequest.class))).thenReturn(batchResult);
 
         runner.run(1);
@@ -118,11 +117,11 @@ public class TestPutSQS {
         ArgumentCaptor<SendMessageBatchRequest> captureRequest = 
ArgumentCaptor.forClass(SendMessageBatchRequest.class);
         Mockito.verify(mockSQSClient, 
Mockito.times(1)).sendMessageBatch(captureRequest.capture());
         SendMessageBatchRequest request = captureRequest.getValue();
-        
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000";,
 request.getQueueUrl());
-        assertEquals("hello", 
request.getEntries().get(0).getMessageAttributes().get("x-custom-prop").getStringValue());
-        assertEquals("TestMessageBody", 
request.getEntries().get(0).getMessageBody());
-        assertEquals("test1234", 
request.getEntries().get(0).getMessageGroupId());
-        assertEquals("fb0dfed8-092e-40ee-83ce-5b576cd26236", 
request.getEntries().get(0).getMessageDeduplicationId());
+        
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000";,
 request.queueUrl());
+        assertEquals("hello", 
request.entries().get(0).messageAttributes().get("x-custom-prop").stringValue());
+        assertEquals("TestMessageBody", 
request.entries().get(0).messageBody());
+        assertEquals("test1234", request.entries().get(0).messageGroupId());
+        assertEquals("fb0dfed8-092e-40ee-83ce-5b576cd26236", 
request.entries().get(0).messageDeduplicationId());
 
         runner.assertAllFlowFilesTransferred(PutSQS.REL_SUCCESS, 1);
     }
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-service-api/pom.xml 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-service-api/pom.xml
index be846c0833..0c5e1f45b3 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-service-api/pom.xml
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-service-api/pom.xml
@@ -32,6 +32,10 @@
             <groupId>software.amazon.awssdk</groupId>
             <artifactId>auth</artifactId>
         </dependency>
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>apache-client</artifactId>
+        </dependency>
         <dependency>
             <groupId>com.amazonaws</groupId>
             <artifactId>aws-java-sdk-s3</artifactId>

Reply via email to