This is an automated email from the ASF dual-hosted git repository.

hong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git


The following commit(s) were added to refs/heads/main by this push:
     new cc0b4d4  [FLINK-35305][Connector/SQS] Amazon SQS Sink Connector
cc0b4d4 is described below

commit cc0b4d4b8e00feabe4e09cd9a24c8389d5bda04e
Author: Priya Dhingra <[email protected]>
AuthorDate: Fri May 10 14:30:50 2024 -0700

    [FLINK-35305][Connector/SQS] Amazon SQS Sink Connector
---
 docs/content.zh/docs/connectors/datastream/sqs.md  | 134 +++++++
 docs/content/docs/connectors/datastream/sqs.md     | 132 +++++++
 .../flink-connector-aws-sqs-e2e-tests/pom.xml      | 108 ++++++
 .../connector/sqs/sink/test/SqsSinkITTest.java     | 161 +++++++++
 .../src/test/resources/log4j2-test.properties      |  28 ++
 flink-connector-aws-e2e-tests/pom.xml              |   1 +
 .../54da9a7d-14d2-4632-a045-1dd8fc665c8f           |   0
 .../a6cbd99c-b115-447a-8f19-43c1094db549           |   0
 .../archunit-violations/stored.rules               |   4 +
 flink-connector-aws/flink-connector-sqs/pom.xml    | 136 +++++++
 .../connector/sqs/sink/SqsConfigConstants.java     |  39 ++
 .../sqs/sink/SqsExceptionClassifiers.java          |  61 ++++
 .../apache/flink/connector/sqs/sink/SqsSink.java   | 158 ++++++++
 .../flink/connector/sqs/sink/SqsSinkBuilder.java   | 160 +++++++++
 .../sqs/sink/SqsSinkElementConverter.java          |  89 +++++
 .../flink/connector/sqs/sink/SqsSinkException.java |  59 +++
 .../flink/connector/sqs/sink/SqsSinkWriter.java    | 235 ++++++++++++
 .../connector/sqs/sink/SqsStateSerializer.java     |  70 ++++
 .../sqs/sink/client/SdkClientProvider.java         |  36 ++
 .../sqs/sink/client/SqsAsyncClientProvider.java    |  64 ++++
 .../src/main/resources/log4j2.properties           |  25 ++
 .../architecture/TestCodeArchitectureTest.java     |  40 +++
 .../sqs/sink/SqsExceptionClassifiersTest.java      |  87 +++++
 .../connector/sqs/sink/SqsSinkBuilderTest.java     |  48 +++
 .../sqs/sink/SqsSinkElementConverterTest.java      |  72 ++++
 .../flink/connector/sqs/sink/SqsSinkTest.java      | 153 ++++++++
 .../connector/sqs/sink/SqsSinkWriterTest.java      | 396 +++++++++++++++++++++
 .../connector/sqs/sink/SqsStateSerializerTest.java |  80 +++++
 .../connector/sqs/sink/testutils/SqsTestUtils.java |  60 ++++
 .../src/test/resources/archunit.properties         |  31 ++
 .../src/test/resources/log4j2-test.properties      |  28 ++
 flink-connector-aws/pom.xml                        |   7 +-
 32 files changed, 2699 insertions(+), 3 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/sqs.md 
b/docs/content.zh/docs/connectors/datastream/sqs.md
new file mode 100644
index 0000000..06c99db
--- /dev/null
+++ b/docs/content.zh/docs/connectors/datastream/sqs.md
@@ -0,0 +1,134 @@
+---
+title: SQS
+weight: 5
+type: docs
+aliases:
+   - /zh/dev/connectors/sqs.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Amazon SQS Sink
+
+The SQS sink writes to [Amazon SQS](https://aws.amazon.com/sqs) using the [AWS 
v2 SDK for 
Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html).
 Follow the instructions from the [Amazon SQS Developer 
Guide](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html)
+to setup a SQS message queue.
+
+To use the connector, add the following Maven dependency to your project:
+
+{{< connector_artifact flink-connector-sqs sqs >}}
+
+{{< tabs "ec24a4ae-6a47-11ed-a1eb-0242ac120002" >}}
+{{< tab "Java" >}}
+```java
+Properties sinkProperties = new Properties();
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+// Optional, provide via alternative routes e.g. environment variables
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
+
+// Optional, use following if you want to provide access via AssumeRole, 
Please make sure given IAM role has "sqs:SendMessage" permission
+sinkProperties.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, 
"ASSUME_ROLE");
+sinkProperties.setProperty(AWSConfigConstants.AWS_ROLE_ARN, 
"replace-this-with-IAMRole-arn");
+sinkProperties.setProperty(AWSConfigConstants.AWS_ROLE_SESSION_NAME, 
"any-session-name-string");
+
+SqsSink<String> sqsSink =
+        SqsSink.<String>builder()
+                .setSerializationSchema(new SimpleStringSchema())              
  // Required
+                
.setSqsUrl("https://sqs.us-east-1.amazonaws.com/xxxx/test-sqs";)  // Required
+                .setSqsClientProperties(sinkProperties)                        
  // Required
+                .setFailOnError(false)                                         
  // Optional
+                .setMaxBatchSize(10)                                           
  // Optional
+                .setMaxInFlightRequests(50)                                    
  // Optional
+                .setMaxBufferedRequests(1_000)                                 
  // Optional
+                .setMaxBatchSizeInBytes(256 * 1024)                         // 
Optional
+                .setMaxTimeInBufferMS(5000)                                    
  // Optional
+                .setMaxRecordSizeInBytes(256 * 1024)                           
 // Optional
+                .build();
+
+flinkStream.sinkTo(sqsSink)
+
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val sinkProperties = new Properties()
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1")
+// Optional, provide via alternative routes e.g. environment variables
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key")
+
+val SqsSink<String> sqsSink =
+                SqsSink.<String>builder()
+                        .setSerializationSchema(new SimpleStringSchema())      
          // Required
+                        
.setSqsUrl("https://sqs.us-east-1.amazonaws.com/xxxx/test-sqs";)  // Required
+                        .setSqsClientProperties(sinkProperties)                
          // Required
+                        .setFailOnError(false)                                 
          // Optional
+                        .setMaxBatchSize(10)                                   
          // Optional
+                        .setMaxInFlightRequests(50)                            
          // Optional
+                        .setMaxBufferedRequests(1_000)                         
          // Optional
+                        .setMaxBatchSizeInBytes(256 * 1024)                    
          // Optional
+                        .setMaxTimeInBufferMS(5000)                            
          // Optional
+                        .setMaxRecordSizeInBytes(256 * 1024)                   
          // Optional
+                        .build();
+                        
+                       
+flinkStream.sinkTo(sqsSink)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+## Configurations
+
+Flink's SQS sink is created by using the static builder 
`SqsSink.<String>builder()`.
+
+1. __setSqsClientProperties(Properties sinkProperties)__
+   * Required.
+   * Supplies credentials, region and other parameters to the SQS client.
+2. __setSerializationSchema(SerializationSchema<InputType> 
serializationSchema)__
+   * Required.
+   * Supplies a serialization schema to the Sink. This schema is used to 
serialize elements before sending to SQS.
+3. __setSqsUrl(String sqsUrl)__
+   * Required.
+   * Url of the SQS to sink to.
+4. _setFailOnError(boolean failOnError)_
+   * Optional. Default: `false`.
+   * Whether failed requests to write records to SQS are treated as fatal 
exceptions in the sink that cause a Flink Job to restart
+5. _setMaxBatchSize(int maxBatchSize)_
+   * Optional. Default: `10`.
+   * Maximum size of a batch to write to SQS.
+6. _setMaxInFlightRequests(int maxInFlightRequests)_
+   * Optional. Default: `50`.
+   * The maximum number of in flight requests allowed before the sink applies 
backpressure.
+7. _setMaxBufferedRequests(int maxBufferedRequests)_
+   * Optional. Default: `5_000`.
+   * The maximum number of records that may be buffered in the sink before 
backpressure is applied.
+8. _setMaxBatchSizeInBytes(int maxBatchSizeInBytes)_
+   * Optional. Default: `256 * 1024`.
+   * The maximum size (in bytes) a batch may become. All batches sent will be 
smaller than or equal to this size.
+9. _setMaxTimeInBufferMS(int maxTimeInBufferMS)_
+   * Optional. Default: `5000`.
+   * The maximum time a record may stay in the sink before being flushed.
+10. _setMaxRecordSizeInBytes(int maxRecordSizeInBytes)_
+   * Optional. Default: `256 * 1024`.
+   * The maximum record size that the sink will accept, records larger than 
this will be automatically rejected.
+11. _build()_
+   * Constructs and returns the SQS sink.
+
diff --git a/docs/content/docs/connectors/datastream/sqs.md 
b/docs/content/docs/connectors/datastream/sqs.md
new file mode 100644
index 0000000..a3648f9
--- /dev/null
+++ b/docs/content/docs/connectors/datastream/sqs.md
@@ -0,0 +1,132 @@
+---
+title: SQS
+weight: 5
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Amazon SQS Sink
+
+The SQS sink writes to [Amazon SQS](https://aws.amazon.com/sqs) using the [AWS 
v2 SDK for 
Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html).
 Follow the instructions from the [Amazon SQS Developer 
Guide](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html)
+to setup a SQS message queue.
+
+To use the connector, add the following Maven dependency to your project:
+
+{{< connector_artifact flink-connector-sqs sqs >}}
+
+{{< tabs "ec24a4ae-6a47-11ed-a1eb-0242ac120002" >}}
+{{< tab "Java" >}}
+```java
+Properties sinkProperties = new Properties();
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+// Optional, provide via alternative routes e.g. environment variables
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
+
+// Optional, use following if you want to provide access via AssumeRole, 
Please make sure given IAM role has "sqs:SendMessage" permission
+sinkProperties.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, 
"ASSUME_ROLE");
+sinkProperties.setProperty(AWSConfigConstants.AWS_ROLE_ARN, 
"replace-this-with-IAMRole-arn");
+sinkProperties.setProperty(AWSConfigConstants.AWS_ROLE_SESSION_NAME, 
"any-session-name-string");
+
+SqsSink<String> sqsSink =
+        SqsSink.<String>builder()
+                .setSerializationSchema(new SimpleStringSchema())              
  // Required
+                
.setSqsUrl("https://sqs.us-east-1.amazonaws.com/xxxx/test-sqs";)  // Required
+                .setSqsClientProperties(sinkProperties)                        
  // Required
+                .setFailOnError(false)                                         
  // Optional
+                .setMaxBatchSize(10)                                           
  // Optional
+                .setMaxInFlightRequests(50)                                    
  // Optional
+                .setMaxBufferedRequests(1_000)                                 
  // Optional
+                .setMaxBatchSizeInBytes(256 * 1024)                         // 
Optional
+                .setMaxTimeInBufferMS(5000)                                    
  // Optional
+                .setMaxRecordSizeInBytes(256 * 1024)                           
 // Optional
+                .build();
+
+flinkStream.sinkTo(sqsSink)
+
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val sinkProperties = new Properties()
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1")
+// Optional, provide via alternative routes e.g. environment variables
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key")
+
+val SqsSink<String> sqsSink =
+                SqsSink.<String>builder()
+                        .setSerializationSchema(new SimpleStringSchema())      
          // Required
+                        
.setSqsUrl("https://sqs.us-east-1.amazonaws.com/xxxx/test-sqs";)  // Required
+                        .setSqsClientProperties(sinkProperties)                
          // Required
+                        .setFailOnError(false)                                 
          // Optional
+                        .setMaxBatchSize(10)                                   
          // Optional
+                        .setMaxInFlightRequests(50)                            
          // Optional
+                        .setMaxBufferedRequests(1_000)                         
          // Optional
+                        .setMaxBatchSizeInBytes(256 * 1024)                    
          // Optional
+                        .setMaxTimeInBufferMS(5000)                            
          // Optional
+                        .setMaxRecordSizeInBytes(256 * 1024)                   
          // Optional
+                        .build();
+                        
+                       
+flinkStream.sinkTo(sqsSink)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+## Configurations
+
+Flink's SQS sink is created by using the static builder 
`SqsSink.<String>builder()`.
+
+1. __setSqsClientProperties(Properties sinkProperties)__
+   * Required.
+   * Supplies credentials, region and other parameters to the SQS client.
+2. __setSerializationSchema(SerializationSchema<InputType> 
serializationSchema)__
+   * Required.
+   * Supplies a serialization schema to the Sink. This schema is used to 
serialize elements before sending to SQS.
+3. __setSqsUrl(String sqsUrl)__
+   * Required.
+   * Url of the SQS to sink to.
+4. _setFailOnError(boolean failOnError)_
+   * Optional. Default: `false`.
+   * Whether failed requests to write records to SQS are treated as fatal 
exceptions in the sink that cause a Flink Job to restart
+5. _setMaxBatchSize(int maxBatchSize)_
+   * Optional. Default: `10`.
+   * Maximum size of a batch to write to SQS.
+6. _setMaxInFlightRequests(int maxInFlightRequests)_
+   * Optional. Default: `50`.
+   * The maximum number of in flight requests allowed before the sink applies 
backpressure.
+7. _setMaxBufferedRequests(int maxBufferedRequests)_
+   * Optional. Default: `5_000`.
+   * The maximum number of records that may be buffered in the sink before 
backpressure is applied.
+8. _setMaxBatchSizeInBytes(int maxBatchSizeInBytes)_
+   * Optional. Default: `256 * 1024`.
+   * The maximum size (in bytes) a batch may become. All batches sent will be 
smaller than or equal to this size.
+9. _setMaxTimeInBufferMS(int maxTimeInBufferMS)_
+   * Optional. Default: `5000`.
+   * The maximum time a record may stay in the sink before being flushed.
+10. _setMaxRecordSizeInBytes(int maxRecordSizeInBytes)_
+* Optional. Default: `256 * 1024`.
+* The maximum record size that the sink will accept, records larger than this 
will be automatically rejected.
+11. _build()_
+* Constructs and returns the SQS sink.
+
diff --git 
a/flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml 
b/flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml
new file mode 100644
index 0000000..3259a6f
--- /dev/null
+++ b/flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml
@@ -0,0 +1,108 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+    <parent>
+        <artifactId>flink-connector-aws-e2e-tests-parent</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>4.4-SNAPSHOT</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>flink-connector-aws-sqs-e2e-tests</artifactId>
+    <name>Flink : Connectors : AWS : E2E Tests : Amazon SQS</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-sqs</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-aws-base</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-sqs</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+
+        <!-- Other third-party dependencies -->
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.datatype</groupId>
+            <artifactId>jackson-datatype-jsr310</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>s3</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+    <dependencyManagement>
+        <dependencies>
+
+            <!-- Overridden aws-sdk dependency to older version to temporarily 
fix 'not able to create sqs localstack error with newer version'-->
+            <dependency>
+                <groupId>software.amazon.awssdk</groupId>
+                <artifactId>bom</artifactId>
+                <version>2.20.144</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+</project>
diff --git 
a/flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/java/org/apache/flink/connector/sqs/sink/test/SqsSinkITTest.java
 
b/flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/java/org/apache/flink/connector/sqs/sink/test/SqsSinkITTest.java
new file mode 100644
index 0000000..97a4e16
--- /dev/null
+++ 
b/flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/java/org/apache/flink/connector/sqs/sink/test/SqsSinkITTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink.test;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.aws.testutils.LocalstackContainer;
+import org.apache.flink.connector.sqs.sink.SqsSink;
+import org.apache.flink.connector.sqs.sink.SqsSinkElementConverter;
+import org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils;
+import org.apache.flink.connector.testframe.container.FlinkContainers;
+import org.apache.flink.connector.testframe.container.TestcontainersSettings;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.TestLogger;
+
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.core.SdkSystemSetting;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.Message;
+import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig;
+import static 
org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils.createSqsClient;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** End to End test for SQS sink API. */
+public class SqsSinkITTest extends TestLogger {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SqsSinkITTest.class);
+
+    private static final int NUMBER_OF_ELEMENTS = 50;
+    private StreamExecutionEnvironment env;
+    private SdkHttpClient httpClient;
+    private SqsClient sqsClient;
+    private static final Network network = Network.newNetwork();
+
+    @ClassRule
+    public static LocalstackContainer mockSqsContainer =
+            new 
LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK))
+                    .withNetwork(network)
+                    .withNetworkAliases("localstack");
+
+    public static final TestcontainersSettings TESTCONTAINERS_SETTINGS =
+            TestcontainersSettings.builder()
+                    .environmentVariable("AWS_CBOR_DISABLE", "1")
+                    .environmentVariable(
+                            "FLINK_ENV_JAVA_OPTS",
+                            
"-Dorg.apache.flink.sqs.shaded.com.amazonaws.sdk.disableCertChecking 
-Daws.cborEnabled=false")
+                    .network(network)
+                    .logger(LOG)
+                    .dependsOn(mockSqsContainer)
+                    .build();
+
+    public static final FlinkContainers FLINK =
+            
FlinkContainers.builder().withTestcontainersSettings(TESTCONTAINERS_SETTINGS).build();
+
+    @Before
+    public void setup() throws Exception {
+        httpClient = AWSServicesTestUtils.createHttpClient();
+        sqsClient = createSqsClient(mockSqsContainer.getEndpoint(), 
httpClient);
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        LOG.info("Done setting up the localstack.");
+    }
+
+    @BeforeClass
+    public static void setupFlink() throws Exception {
+        FLINK.start();
+    }
+
+    @AfterClass
+    public static void stopFlink() {
+        FLINK.stop();
+    }
+
+    @After
+    public void teardown() {
+        System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property());
+        httpClient.close();
+        sqsClient.close();
+    }
+
+    @Test
+    public void sqsSinkWritesCorrectDataToMockAWSServices() throws Exception {
+        LOG.info("1 - Creating the SQS");
+        SqsTestUtils.createSqs("test-sqs", sqsClient);
+
+        SqsSink<String> sqsSink =
+                SqsSink.<String>builder()
+                        .setSqsSinkElementConverter(
+                                SqsSinkElementConverter.<String>builder()
+                                        .setSerializationSchema(new 
SimpleStringSchema())
+                                        .build())
+                        .setSqsUrl("http://localhost:4576/queue/test-sqs";)
+                        
.setSqsClientProperties(createConfig(mockSqsContainer.getEndpoint()))
+                        .build();
+
+        SqsTestUtils.getSampleDataGenerator(env, 
NUMBER_OF_ELEMENTS).sinkTo(sqsSink);
+        env.execute("Integration Test");
+        List<Message> messages = new ArrayList<>();
+        for (int i = 0; i < 5; i++) {
+            // Read data from SQS and validate
+            ReceiveMessageRequest receiveMessageRequest =
+                    ReceiveMessageRequest.builder()
+                            .queueUrl("http://localhost:4576/queue/test-sqs";)
+                            .maxNumberOfMessages(10) // max 10 can be read at 
a time
+                            .build();
+
+            
messages.addAll(sqsClient.receiveMessage(receiveMessageRequest).messages());
+        }
+
+        // Add assertions here to validate the messages
+        assertEquals(
+                NUMBER_OF_ELEMENTS,
+                messages.size(),
+                "Number of messages received should match the number of 
elements sent");
+
+        List<String> sentDataList = new ArrayList<>();
+        SqsTestUtils.getSampleDataGenerator(env, NUMBER_OF_ELEMENTS)
+                .executeAndCollect()
+                .forEachRemaining(sentDataList::add);
+
+        List<String> receivedDataList = new ArrayList<>();
+        for (Message message : messages) {
+            receivedDataList.add(new String(message.body()));
+        }
+
+        
Assertions.assertThat(sentDataList.containsAll(receivedDataList)).isTrue();
+    }
+}
diff --git 
a/flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/resources/log4j2-test.properties
 
b/flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..835c2ec
--- /dev/null
+++ 
b/flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-connector-aws-e2e-tests/pom.xml 
b/flink-connector-aws-e2e-tests/pom.xml
index e291d3d..ee59f73 100644
--- a/flink-connector-aws-e2e-tests/pom.xml
+++ b/flink-connector-aws-e2e-tests/pom.xml
@@ -41,6 +41,7 @@ under the License.
         <module>flink-connector-aws-kinesis-firehose-e2e-tests</module>
         <module>flink-connector-aws-kinesis-streams-e2e-tests</module>
         <module>flink-connector-kinesis-e2e-tests</module>
+        <module>flink-connector-aws-sqs-e2e-tests</module>
         <module>flink-formats-avro-glue-schema-registry-e2e-tests</module>
         <module>flink-formats-json-glue-schema-registry-e2e-tests</module>
     </modules>
diff --git 
a/flink-connector-aws/flink-connector-sqs/archunit-violations/54da9a7d-14d2-4632-a045-1dd8fc665c8f
 
b/flink-connector-aws/flink-connector-sqs/archunit-violations/54da9a7d-14d2-4632-a045-1dd8fc665c8f
new file mode 100644
index 0000000..e69de29
diff --git 
a/flink-connector-aws/flink-connector-sqs/archunit-violations/a6cbd99c-b115-447a-8f19-43c1094db549
 
b/flink-connector-aws/flink-connector-sqs/archunit-violations/a6cbd99c-b115-447a-8f19-43c1094db549
new file mode 100644
index 0000000..e69de29
diff --git 
a/flink-connector-aws/flink-connector-sqs/archunit-violations/stored.rules 
b/flink-connector-aws/flink-connector-sqs/archunit-violations/stored.rules
new file mode 100644
index 0000000..cf8b667
--- /dev/null
+++ b/flink-connector-aws/flink-connector-sqs/archunit-violations/stored.rules
@@ -0,0 +1,4 @@
+#
+#Tue Feb 22 12:19:26 CET 2022
+Tests\ inheriting\ from\ AbstractTestBase\ should\ have\ name\ ending\ with\ 
ITCase=54da9a7d-14d2-4632-a045-1dd8fc665c8f
+ITCASE\ tests\ should\ use\ a\ MiniCluster\ resource\ or\ 
extension=a6cbd99c-b115-447a-8f19-43c1094db549
diff --git a/flink-connector-aws/flink-connector-sqs/pom.xml 
b/flink-connector-aws/flink-connector-sqs/pom.xml
new file mode 100644
index 0000000..00e3c29
--- /dev/null
+++ b/flink-connector-aws/flink-connector-sqs/pom.xml
@@ -0,0 +1,136 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-connector-aws-parent</artifactId>
+        <version>4.4-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-connector-sqs</artifactId>
+    <name>Flink : Connectors : AWS : Amazon SQS</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-aws-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>sqs</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>netty-nio-client</artifactId>
+        </dependency>
+
+        <!-- Test dependencies -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-test-utils</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-aws-base</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-base</artifactId>
+            <version>${flink.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.datatype</groupId>
+            <artifactId>jackson-datatype-jsr310</artifactId>
+        </dependency>
+
+        <!-- ArchUnit test dependencies -->
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-architecture-tests-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>s3</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsConfigConstants.java
 
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsConfigConstants.java
new file mode 100644
index 0000000..61e5eb6
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsConfigConstants.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/** Defaults for {@link SqsSinkWriter}. */
+@PublicEvolving
+public class SqsConfigConstants {
+
+    public static final ConfigOption<String> BASE_SQS_USER_AGENT_PREFIX_FORMAT 
=
+            ConfigOptions.key("Apache Flink %s (%s) SQS Connector")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("SQS useragent prefix format.");
+
+    public static final ConfigOption<String> SQS_CLIENT_USER_AGENT_PREFIX =
+            ConfigOptions.key("aws.sqs.client.user-agent-prefix")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("SQS identifier for user agent prefix.");
+}
diff --git 
a/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsExceptionClassifiers.java
 
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsExceptionClassifiers.java
new file mode 100644
index 0000000..7f6d39e
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsExceptionClassifiers.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.connector.aws.sink.throwable.AWSExceptionClassifierUtil;
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+
+import software.amazon.awssdk.services.sqs.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.sqs.model.SqsException;
+
+/**
+ * Class containing set of {@link FatalExceptionClassifier} for {@link
+ * software.amazon.awssdk.services.sqs.model.SqsException}.
+ */
+@Internal
+public class SqsExceptionClassifiers {
+
+    public static FatalExceptionClassifier 
getNotAuthorizedExceptionClassifier() {
+        return AWSExceptionClassifierUtil.withAWSServiceErrorCode(
+                SqsException.class,
+                "NotAuthorized",
+                err ->
+                        new SqsSinkException(
+                                "Encountered non-recoverable exception: 
NotAuthorized", err));
+    }
+
+    public static FatalExceptionClassifier 
getAccessDeniedExceptionClassifier() {
+        return AWSExceptionClassifierUtil.withAWSServiceErrorCode(
+                SqsException.class,
+                "AccessDeniedException",
+                err ->
+                        new SqsSinkException(
+                                "Encountered non-recoverable exception: 
AccessDeniedException",
+                                err));
+    }
+
+    public static FatalExceptionClassifier 
getResourceNotFoundExceptionClassifier() {
+        return FatalExceptionClassifier.withRootCauseOfType(
+                ResourceNotFoundException.class,
+                err ->
+                        new SqsSinkException(
+                                "Encountered non-recoverable exception 
relating to not being able to find the specified resources",
+                                err));
+    }
+}
diff --git 
a/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSink.java
 
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSink.java
new file mode 100644
index 0000000..eea6109
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSink.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.base.sink.AsyncSinkBase;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.sqs.sink.client.SdkClientProvider;
+import org.apache.flink.connector.sqs.sink.client.SqsAsyncClientProvider;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Properties;
+
+/**
+ * A SQS Sink that performs async requests against a destination SQS using the 
buffering protocol
+ * specified in {@link AsyncSinkBase}.
+ *
+ * <p>The sink internally uses a {@link 
software.amazon.awssdk.services.sqs.SqsAsyncClient} to
+ * communicate with the AWS endpoint.
+ *
+ * <p>Please see the writer implementation in {@link SqsSinkWriter}
+ *
+ * @param <InputT> Type of the elements handled by this sink
+ */
+@PublicEvolving
+public class SqsSink<InputT> extends AsyncSinkBase<InputT, 
SendMessageBatchRequestEntry> {
+
+    private final boolean failOnError;
+    private final String sqsUrl;
+    private final Properties sqsClientProperties;
+    private transient SdkClientProvider<SqsAsyncClient> 
asyncClientSdkClientProviderOverride;
+
+    SqsSink(
+            ElementConverter<InputT, SendMessageBatchRequestEntry> 
elementConverter,
+            Integer maxBatchSize,
+            Integer maxInFlightRequests,
+            Integer maxBufferedRequests,
+            Long maxBatchSizeInBytes,
+            Long maxTimeInBufferMS,
+            Long maxRecordSizeInBytes,
+            boolean failOnError,
+            String sqsUrl,
+            Properties sqsClientProperties) {
+        super(
+                elementConverter,
+                maxBatchSize,
+                maxInFlightRequests,
+                maxBufferedRequests,
+                maxBatchSizeInBytes,
+                maxTimeInBufferMS,
+                maxRecordSizeInBytes);
+        this.sqsUrl =
+                Preconditions.checkNotNull(
+                        sqsUrl, "The sqs url must not be null when 
initializing the SQS Sink.");
+        Preconditions.checkArgument(
+                !this.sqsUrl.isEmpty(), "The sqs url must be set when 
initializing the SQS Sink.");
+
+        Preconditions.checkArgument(
+                (this.getMaxBatchSize() <= 10),
+                "The sqs MaxBatchSize must not be greater than 10.");
+        this.failOnError = failOnError;
+        this.sqsClientProperties = sqsClientProperties;
+    }
+
+    /**
+     * Create a {@link SqsSinkBuilder} to allow the fluent construction of a 
new {@code SqsSink}.
+     *
+     * @param <InputT> type of incoming records
+     * @return {@link SqsSinkBuilder}
+     */
+    public static <InputT> SqsSinkBuilder<InputT> builder() {
+        return new SqsSinkBuilder<>();
+    }
+
+    @Override
+    public StatefulSinkWriter<InputT, 
BufferedRequestState<SendMessageBatchRequestEntry>>
+            createWriter(InitContext context) throws IOException {
+        return new SqsSinkWriter<>(
+                getElementConverter(),
+                context,
+                getMaxBatchSize(),
+                getMaxInFlightRequests(),
+                getMaxBufferedRequests(),
+                getMaxBatchSizeInBytes(),
+                getMaxTimeInBufferMS(),
+                getMaxRecordSizeInBytes(),
+                failOnError,
+                sqsUrl,
+                getAsyncClientProvider(sqsClientProperties),
+                Collections.emptyList());
+    }
+
+    @Override
+    public StatefulSinkWriter<InputT, 
BufferedRequestState<SendMessageBatchRequestEntry>>
+            restoreWriter(
+                    InitContext context,
+                    
Collection<BufferedRequestState<SendMessageBatchRequestEntry>> recoveredState)
+                    throws IOException {
+        return new SqsSinkWriter<>(
+                getElementConverter(),
+                context,
+                getMaxBatchSize(),
+                getMaxInFlightRequests(),
+                getMaxBufferedRequests(),
+                getMaxBatchSizeInBytes(),
+                getMaxTimeInBufferMS(),
+                getMaxRecordSizeInBytes(),
+                failOnError,
+                sqsUrl,
+                getAsyncClientProvider(sqsClientProperties),
+                recoveredState);
+    }
+
+    private SdkClientProvider<SqsAsyncClient> 
getAsyncClientProvider(Properties clientProperties) {
+        if (asyncClientSdkClientProviderOverride != null) {
+            return asyncClientSdkClientProviderOverride;
+        }
+        return new SqsAsyncClientProvider(clientProperties);
+    }
+
+    @Internal
+    @VisibleForTesting
+    void setSqsAsyncClientProvider(
+            SdkClientProvider<SqsAsyncClient> 
asyncClientSdkClientProviderOverride) {
+        this.asyncClientSdkClientProviderOverride = 
asyncClientSdkClientProviderOverride;
+    }
+
+    @Override
+    public 
SimpleVersionedSerializer<BufferedRequestState<SendMessageBatchRequestEntry>>
+            getWriterStateSerializer() {
+        return new SqsStateSerializer();
+    }
+}
diff --git 
a/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkBuilder.java
 
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkBuilder.java
new file mode 100644
index 0000000..9db622c
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkBuilder.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
+
+import software.amazon.awssdk.http.Protocol;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION;
+import static software.amazon.awssdk.http.Protocol.HTTP1_1;
+
+/**
+ * Builder to construct {@link SqsSink}.
+ *
+ * <p>The following example shows the minimum setup to create a {@link 
SqsSink} that writes String
+ * values to a SQS named sqsUrl.
+ *
+ * <pre>{@code
+ * Properties sinkProperties = new Properties();
+ * sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+ *
+ * SqsSink<String> sqsSink =
+ *         SqsSink.<String>builder()
+ *                 .setSqsUrl("sqsUrl")
+ *                 .setSqsClientProperties(sinkProperties)
+ *                 
.setSqsSinkElementConverter(SqsSinkElementConverter.<String>builder()
+ *                                 .setSerializationSchema(new 
SimpleStringSchema())
+ *                                 .build())
+ *                 .build();
+ * }</pre>
+ *
+ * <p>If the following parameters are not set in this builder, the following 
defaults will be used:
+ *
+ * <ul>
+ *   <li>{@code maxBatchSize} will be 10
+ *   <li>{@code maxInFlightRequests} will be 50
+ *   <li>{@code maxBufferedRequests} will be 5000
+ *   <li>{@code maxBatchSizeInBytes} will be 256 KB i.e. {@code 256 * 1000}
+ *   <li>{@code maxTimeInBufferMs} will be 5000ms
+ *   <li>{@code maxRecordSizeInBytes} will be 256 KB i.e. {@code 256 * 1000}
+ *   <li>{@code failOnError} will be false
+ * </ul>
+ *
+ * @param <InputT> type of elements that should be persisted in the destination
+ */
+@PublicEvolving
+public class SqsSinkBuilder<InputT>
+        extends AsyncSinkBaseBuilder<InputT, SendMessageBatchRequestEntry, 
SqsSinkBuilder<InputT>> {
+
+    private static final int DEFAULT_MAX_BATCH_SIZE = 10;
+    private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50;
+    private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 5_000;
+    private static final long DEFAULT_MAX_BATCH_SIZE_IN_B = 256 * 1000;
+    private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000;
+    private static final long DEFAULT_MAX_RECORD_SIZE_IN_B = 256 * 1000;
+    private static final boolean DEFAULT_FAIL_ON_ERROR = false;
+    private static final Protocol DEFAULT_HTTP_PROTOCOL = HTTP1_1;
+
+    private Boolean failOnError;
+    private String sqsUrl;
+    private Properties sqsClientProperties;
+
+    private SqsSinkElementConverter<InputT> sqsSinkElementConverter;
+
+    SqsSinkBuilder() {}
+
+    /**
+     * Sets the url of the SQS that the sink will connect to. There is no 
default for this
+     * parameter, therefore, this must be provided at sink creation time 
otherwise the build will
+     * fail.
+     *
+     * @param sqsUrl the url of the Sqs
+     * @return {@link SqsSinkBuilder} itself
+     */
+    public SqsSinkBuilder<InputT> setSqsUrl(String sqsUrl) {
+        this.sqsUrl = sqsUrl;
+        return this;
+    }
+
+    public SqsSinkBuilder<InputT> setSqsSinkElementConverter(
+            final SqsSinkElementConverter<InputT> sqsSinkElementConverter) {
+        this.sqsSinkElementConverter = sqsSinkElementConverter;
+        return this;
+    }
+
+    /**
+     * If writing to SQS results in a partial or full failure being returned, 
the job will fail
+     * immediately with a {@link SqsSinkException} if failOnError is set.
+     *
+     * @param failOnError whether to fail on error
+     * @return {@link SqsSinkBuilder} itself
+     */
+    public SqsSinkBuilder<InputT> setFailOnError(boolean failOnError) {
+        this.failOnError = failOnError;
+        return this;
+    }
+
+    /**
+     * A set of properties used by the sink to create the SQS client. This may 
be used to set the
+     * aws region, credentials etc. See the docs for usage and syntax.
+     *
+     * @param sqsClientProps SQS client properties
+     * @return {@link SqsSinkBuilder} itself
+     */
+    public SqsSinkBuilder<InputT> setSqsClientProperties(final Properties 
sqsClientProps) {
+        sqsClientProperties = sqsClientProps;
+        return this;
+    }
+
+    @VisibleForTesting
+    Properties getClientPropertiesWithDefaultHttpProtocol() {
+        Properties clientProperties =
+                Optional.ofNullable(sqsClientProperties).orElse(new 
Properties());
+        clientProperties.putIfAbsent(HTTP_PROTOCOL_VERSION, 
DEFAULT_HTTP_PROTOCOL.toString());
+        return clientProperties;
+    }
+
+    @Override
+    public SqsSink<InputT> build() {
+        return new SqsSink<>(
+                Optional.ofNullable(sqsSinkElementConverter)
+                        .orElse(
+                                (SqsSinkElementConverter<InputT>)
+                                        
SqsSinkElementConverter.<String>builder()
+                                                .setSerializationSchema(new 
SimpleStringSchema())
+                                                .build()),
+                
Optional.ofNullable(getMaxBatchSize()).orElse(DEFAULT_MAX_BATCH_SIZE),
+                Optional.ofNullable(getMaxInFlightRequests())
+                        .orElse(DEFAULT_MAX_IN_FLIGHT_REQUESTS),
+                
Optional.ofNullable(getMaxBufferedRequests()).orElse(DEFAULT_MAX_BUFFERED_REQUESTS),
+                
Optional.ofNullable(getMaxBatchSizeInBytes()).orElse(DEFAULT_MAX_BATCH_SIZE_IN_B),
+                
Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS),
+                
Optional.ofNullable(getMaxRecordSizeInBytes()).orElse(DEFAULT_MAX_RECORD_SIZE_IN_B),
+                Optional.ofNullable(failOnError).orElse(DEFAULT_FAIL_ON_ERROR),
+                sqsUrl,
+                getClientPropertiesWithDefaultHttpProtocol());
+    }
+}
diff --git 
a/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkElementConverter.java
 
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkElementConverter.java
new file mode 100644
index 0000000..ba07b46
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkElementConverter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+
+/**
+ * An implementation of the {@link ElementConverter} that uses the AWS SQS SDK 
v2. The user only
+ * needs to provide a {@link SerializationSchema} of the {@code InputT} to 
transform it into a
+ * {@link SendMessageBatchRequestEntry} that may be persisted.
+ */
+@Internal
+public class SqsSinkElementConverter<InputT>
+        implements ElementConverter<InputT, SendMessageBatchRequestEntry> {
+
+    /** A serialization schema to specify how the input element should be 
serialized. */
+    private final SerializationSchema<InputT> serializationSchema;
+
+    private SqsSinkElementConverter(SerializationSchema<InputT> 
serializationSchema) {
+        this.serializationSchema = serializationSchema;
+    }
+
+    @Override
+    public SendMessageBatchRequestEntry apply(InputT element, 
SinkWriter.Context context) {
+        final byte[] messageBody = serializationSchema.serialize(element);
+        return SendMessageBatchRequestEntry.builder()
+                .id(UUID.randomUUID().toString())
+                .messageBody(new String(messageBody, StandardCharsets.UTF_8))
+                .build();
+    }
+
+    @Override
+    public void open(Sink.InitContext context) {
+        try {
+            
serializationSchema.open(context.asSerializationSchemaInitializationContext());
+        } catch (Exception e) {
+            throw new FlinkRuntimeException("Failed to initialize 
serialization schema.", e);
+        }
+    }
+
+    public static <InputT> Builder<InputT> builder() {
+        return new Builder<>();
+    }
+
+    /** A builder for the SqsSinkElementConverter. */
+    public static class Builder<InputT> {
+
+        private SerializationSchema<InputT> serializationSchema;
+
+        public Builder<InputT> setSerializationSchema(
+                SerializationSchema<InputT> serializationSchema) {
+            this.serializationSchema = serializationSchema;
+            return this;
+        }
+
+        public SqsSinkElementConverter<InputT> build() {
+            Preconditions.checkNotNull(
+                    serializationSchema,
+                    "No SerializationSchema was supplied to the SQS Sink 
builder.");
+            return new SqsSinkElementConverter<>(serializationSchema);
+        }
+    }
+}
diff --git 
a/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkException.java
 
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkException.java
new file mode 100644
index 0000000..d93305b
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkException.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** A {@link RuntimeException} wrapper indicating the exception was thrown 
from the SQS Sink. */
+@PublicEvolving
+class SqsSinkException extends RuntimeException {
+
+    public SqsSinkException(final String message) {
+        super(message);
+    }
+
+    public SqsSinkException(final String message, final Throwable cause) {
+        super(message, cause);
+    }
+
+    /**
+     * When the flag {@code failOnError} is set in {@link SqsSinkWriter}, this 
exception is raised
+     * as soon as any exception occurs when writing to SQS.
+     */
+    static class SqsFailFastSinkException extends SqsSinkException {
+
+        private static final String ERROR_MESSAGE =
+                "Encountered an exception while persisting records, not 
retrying due to {failOnError} being set.";
+
+        public SqsFailFastSinkException() {
+            super(ERROR_MESSAGE);
+        }
+
+        public SqsFailFastSinkException(final String errorMessage) {
+            super(errorMessage);
+        }
+
+        public SqsFailFastSinkException(final String errorMessage, final 
Throwable cause) {
+            super(errorMessage, cause);
+        }
+
+        public SqsFailFastSinkException(final Throwable cause) {
+            super(ERROR_MESSAGE, cause);
+        }
+    }
+}
diff --git 
a/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkWriter.java
 
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkWriter.java
new file mode 100644
index 0000000..459e068
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkWriter.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
+import org.apache.flink.connector.sqs.sink.client.SdkClientProvider;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier;
+import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier;
+import static 
org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier;
+
+/**
+ * Sink writer created by {@link SqsSink} to write to SQS. More details on the 
operation of this
+ * sink writer may be found in the doc for {@link SqsSink}. More details on 
the internals of this
+ * sink writer may be found in {@link AsyncSinkWriter}.
+ *
+ * <p>The {@link SqsAsyncClient} used here may be configured in the standard 
way for the AWS SDK
+ * 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} 
and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+@Internal
+class SqsSinkWriter<InputT> extends AsyncSinkWriter<InputT, 
SendMessageBatchRequestEntry> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SqsSinkWriter.class);
+
+    private final SdkClientProvider<SqsAsyncClient> clientProvider;
+
+    private static final AWSExceptionHandler SQS_EXCEPTION_HANDLER =
+            AWSExceptionHandler.withClassifier(
+                    FatalExceptionClassifier.createChain(
+                            getInterruptedExceptionClassifier(),
+                            getInvalidCredentialsExceptionClassifier(),
+                            
SqsExceptionClassifiers.getResourceNotFoundExceptionClassifier(),
+                            
SqsExceptionClassifiers.getAccessDeniedExceptionClassifier(),
+                            
SqsExceptionClassifiers.getNotAuthorizedExceptionClassifier(),
+                            getSdkClientMisconfiguredExceptionClassifier()));
+
+    private final Counter numRecordsOutErrorsCounter;
+
+    /* Url of SQS */
+    private final String sqsUrl;
+
+    /* The sink writer metric group */
+    private final SinkWriterMetricGroup metrics;
+
+    /* Flag to whether fatally fail any time we encounter an exception when 
persisting records */
+    private final boolean failOnError;
+
+    SqsSinkWriter(
+            ElementConverter<InputT, SendMessageBatchRequestEntry> 
elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes,
+            boolean failOnError,
+            String sqsUrl,
+            SdkClientProvider<SqsAsyncClient> clientProvider,
+            Collection<BufferedRequestState<SendMessageBatchRequestEntry>> 
initialStates) {
+        super(
+                elementConverter,
+                context,
+                AsyncSinkWriterConfiguration.builder()
+                        .setMaxBatchSize(maxBatchSize)
+                        .setMaxBatchSizeInBytes(maxBatchSizeInBytes)
+                        .setMaxInFlightRequests(maxInFlightRequests)
+                        .setMaxBufferedRequests(maxBufferedRequests)
+                        .setMaxTimeInBufferMS(maxTimeInBufferMS)
+                        .setMaxRecordSizeInBytes(maxRecordSizeInBytes)
+                        .build(),
+                initialStates);
+        this.failOnError = failOnError;
+        this.sqsUrl = sqsUrl;
+        this.metrics = context.metricGroup();
+        this.numRecordsOutErrorsCounter = 
metrics.getNumRecordsOutErrorsCounter();
+        this.clientProvider = clientProvider;
+    }
+
+    @Override
+    protected void submitRequestEntries(
+            List<SendMessageBatchRequestEntry> requestEntries,
+            Consumer<List<SendMessageBatchRequestEntry>> requestResult) {
+
+        final SendMessageBatchRequest batchRequest =
+                
SendMessageBatchRequest.builder().entries(requestEntries).queueUrl(sqsUrl).build();
+
+        CompletableFuture<SendMessageBatchResponse> future =
+                clientProvider.getClient().sendMessageBatch(batchRequest);
+
+        future.whenComplete(
+                        (response, err) -> {
+                            if (err != null) {
+                                handleFullyFailedRequest(err, requestEntries, 
requestResult);
+                            } else if (response.failed() != null && 
response.failed().size() > 0) {
+                                handlePartiallyFailedRequest(
+                                        response, requestEntries, 
requestResult);
+                            } else {
+                                requestResult.accept(Collections.emptyList());
+                            }
+                        })
+                .exceptionally(
+                        ex -> {
+                            getFatalExceptionCons()
+                                    .accept(
+                                            new 
SqsSinkException.SqsFailFastSinkException(
+                                                    ex.getMessage(), ex));
+                            return null;
+                        });
+    }
+
+    @Override
+    protected long getSizeInBytes(SendMessageBatchRequestEntry requestEntry) {
+        return 
requestEntry.messageBody().getBytes(StandardCharsets.UTF_8).length;
+    }
+
+    @Override
+    public void close() {
+        AWSGeneralUtil.closeResources(clientProvider);
+    }
+
+    private void handleFullyFailedRequest(
+            Throwable err,
+            List<SendMessageBatchRequestEntry> requestEntries,
+            Consumer<List<SendMessageBatchRequestEntry>> requestResult) {
+
+        numRecordsOutErrorsCounter.inc(requestEntries.size());
+        boolean isFatal = SQS_EXCEPTION_HANDLER.consumeIfFatal(err, 
getFatalExceptionCons());
+        if (isFatal) {
+            return;
+        }
+
+        if (failOnError) {
+            getFatalExceptionCons().accept(new 
SqsSinkException.SqsFailFastSinkException(err));
+            return;
+        }
+
+        LOG.warn(
+                "SQS Sink failed to write and will retry {} entries to SQS,  
First request was {}",
+                requestEntries.size(),
+                requestEntries.get(0).toString(),
+                err);
+        requestResult.accept(requestEntries);
+    }
+
+    private void handlePartiallyFailedRequest(
+            SendMessageBatchResponse response,
+            List<SendMessageBatchRequestEntry> requestEntries,
+            Consumer<List<SendMessageBatchRequestEntry>> requestResult) {
+
+        LOG.warn(
+                "handlePartiallyFailedRequest: SQS Sink failed to write and 
will retry {} entries to SQS",
+                response.failed().size());
+        numRecordsOutErrorsCounter.inc(response.failed().size());
+
+        if (failOnError) {
+            getFatalExceptionCons().accept(new 
SqsSinkException.SqsFailFastSinkException());
+            return;
+        }
+
+        final List<SendMessageBatchRequestEntry> failedRequestEntries =
+                new ArrayList<>(response.failed().size());
+
+        for (final BatchResultErrorEntry failedEntry : response.failed()) {
+            final Optional<SendMessageBatchRequestEntry> retryEntry =
+                    getFailedRecord(requestEntries, failedEntry.id());
+            if (retryEntry.isPresent()) {
+                failedRequestEntries.add(retryEntry.get());
+            } else {
+                LOG.error(
+                        "handlePartiallyFailedRequest: SQS Sink failed to 
retry unsuccessful SQS publish request due to invalid failed requestId");
+                getFatalExceptionCons()
+                        .accept(
+                                new SqsSinkException.SqsFailFastSinkException(
+                                        "SQS Sink failed to retry unsuccessful 
SQS publish request due to invalid failed requestId"));
+                return;
+            }
+        }
+
+        requestResult.accept(failedRequestEntries);
+    }
+
+    private Optional<SendMessageBatchRequestEntry> getFailedRecord(
+            List<SendMessageBatchRequestEntry> requestEntries, String 
selectedId) {
+        for (SendMessageBatchRequestEntry entry : requestEntries) {
+            if (entry.id().equals(selectedId)) {
+                return Optional.of(entry);
+            }
+        }
+        return Optional.empty();
+    }
+}
diff --git 
a/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsStateSerializer.java
 
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsStateSerializer.java
new file mode 100644
index 0000000..426409a
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsStateSerializer.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer;
+
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+/** SQS implementation {@link AsyncSinkWriterStateSerializer}. */
+@Internal
+public class SqsStateSerializer
+        extends AsyncSinkWriterStateSerializer<SendMessageBatchRequestEntry> {
+    @Override
+    protected void serializeRequestToStream(
+            final SendMessageBatchRequestEntry request, final DataOutputStream 
out)
+            throws IOException {
+        out.write(request.messageBody().getBytes(StandardCharsets.UTF_8));
+        serializeRequestId(request.id(), out);
+    }
+
+    protected void serializeRequestId(String requestId, DataOutputStream out) 
throws IOException {
+        out.writeInt(requestId.length());
+        out.write(requestId.getBytes(StandardCharsets.UTF_8));
+    }
+
+    @Override
+    protected SendMessageBatchRequestEntry deserializeRequestFromStream(
+            final long requestSize, final DataInputStream in) throws 
IOException {
+        final byte[] requestData = new byte[(int) requestSize];
+        in.read(requestData);
+        return SendMessageBatchRequestEntry.builder()
+                .id(deserializeRequestId(in))
+                .messageBody(new String(requestData, StandardCharsets.UTF_8))
+                .build();
+    }
+
+    protected String deserializeRequestId(DataInputStream in) throws 
IOException {
+        int requestIdLength = in.readInt();
+        byte[] requestIdData = new byte[(int) requestIdLength];
+        in.read(requestIdData);
+        return new String(requestIdData, StandardCharsets.UTF_8);
+    }
+
+    @Override
+    public int getVersion() {
+        return 1;
+    }
+}
diff --git 
a/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/client/SdkClientProvider.java
 
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/client/SdkClientProvider.java
new file mode 100644
index 0000000..8c3b1ae
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/client/SdkClientProvider.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink.client;
+
+import org.apache.flink.annotation.Internal;
+
+import software.amazon.awssdk.core.SdkClient;
+import software.amazon.awssdk.utils.SdkAutoCloseable;
+
+/** Provides a {@link SdkClient}. */
+@Internal
+public interface SdkClientProvider<T extends SdkClient> extends 
SdkAutoCloseable {
+
+    /**
+     * Returns {@link T}.
+     *
+     * @return the AWS SDK client
+     */
+    T getClient();
+}
diff --git 
a/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/client/SqsAsyncClientProvider.java
 
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/client/SqsAsyncClientProvider.java
new file mode 100644
index 0000000..dfd6c80
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/client/SqsAsyncClientProvider.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink.client;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.aws.util.AWSClientUtil;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.sqs.sink.SqsConfigConstants;
+
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+
+import java.util.Properties;
+
+/** Provides a {@link SqsAsyncClient}. */
+@Internal
+public class SqsAsyncClientProvider implements 
SdkClientProvider<SqsAsyncClient> {
+
+    private final SdkAsyncHttpClient httpClient;
+    private final SqsAsyncClient sqsAsyncClient;
+
+    public SqsAsyncClientProvider(Properties clientProperties) {
+        this.httpClient = 
AWSGeneralUtil.createAsyncHttpClient(clientProperties);
+        this.sqsAsyncClient = buildClient(clientProperties, httpClient);
+    }
+
+    @Override
+    public SqsAsyncClient getClient() {
+        return sqsAsyncClient;
+    }
+
+    @Override
+    public void close() {
+        AWSGeneralUtil.closeResources(httpClient, sqsAsyncClient);
+    }
+
+    private SqsAsyncClient buildClient(
+            Properties sqsClientProperties, SdkAsyncHttpClient httpClient) {
+        AWSGeneralUtil.validateAwsCredentials(sqsClientProperties);
+
+        return AWSClientUtil.createAwsAsyncClient(
+                sqsClientProperties,
+                httpClient,
+                SqsAsyncClient.builder(),
+                SqsConfigConstants.BASE_SQS_USER_AGENT_PREFIX_FORMAT.key(),
+                SqsConfigConstants.SQS_CLIENT_USER_AGENT_PREFIX.key());
+    }
+}
diff --git 
a/flink-connector-aws/flink-connector-sqs/src/main/resources/log4j2.properties 
b/flink-connector-aws/flink-connector-sqs/src/main/resources/log4j2.properties
new file mode 100644
index 0000000..c64a340
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-sqs/src/main/resources/log4j2.properties
@@ -0,0 +1,25 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger.level = OFF
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git 
a/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
 
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
new file mode 100644
index 0000000..ff6d2f3
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.architecture;
+
+import org.apache.flink.architecture.common.ImportOptions;
+
+import com.tngtech.archunit.core.importer.ImportOption;
+import com.tngtech.archunit.junit.AnalyzeClasses;
+import com.tngtech.archunit.junit.ArchTest;
+import com.tngtech.archunit.junit.ArchTests;
+
+/** Architecture tests for test code. */
+@AnalyzeClasses(
+        packages = "org.apache.flink.connector.sqs",
+        importOptions = {
+            ImportOption.OnlyIncludeTests.class,
+            ImportOptions.ExcludeScalaImportOption.class,
+            ImportOptions.ExcludeShadedImportOption.class
+        })
+public class TestCodeArchitectureTest {
+
+    @ArchTest
+    public static final ArchTests COMMON_TESTS = 
ArchTests.in(TestCodeArchitectureTestBase.class);
+}
diff --git 
a/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsExceptionClassifiersTest.java
 
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsExceptionClassifiersTest.java
new file mode 100644
index 0000000..4ebb4e2
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsExceptionClassifiersTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.services.sqs.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.sqs.model.SqsException;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+/** Unit tests for {@link SqsExceptionClassifiers}. */
+public class SqsExceptionClassifiersTest {
+
+    private final FatalExceptionClassifier classifier =
+            FatalExceptionClassifier.createChain(
+                    
SqsExceptionClassifiers.getAccessDeniedExceptionClassifier(),
+                    
SqsExceptionClassifiers.getNotAuthorizedExceptionClassifier(),
+                    
SqsExceptionClassifiers.getResourceNotFoundExceptionClassifier());
+
+    @Test
+    public void shouldClassifyNotAuthorizedAsFatal() {
+        AwsServiceException sqsException =
+                SqsException.builder()
+                        .awsErrorDetails(
+                                
AwsErrorDetails.builder().errorCode("NotAuthorized").build())
+                        .build();
+
+        // isFatal returns `true` if an exception is non-fatal
+        assertFalse(classifier.isFatal(sqsException, ex -> {}));
+    }
+
+    @Test
+    public void shouldClassifyAccessDeniedExceptionAsFatal() {
+        AwsServiceException sqsException =
+                SqsException.builder()
+                        .awsErrorDetails(
+                                AwsErrorDetails.builder()
+                                        .errorCode("AccessDeniedException")
+                                        .build())
+                        .build();
+
+        // isFatal returns `true` if an exception is non-fatal
+        assertFalse(classifier.isFatal(sqsException, ex -> {}));
+    }
+
+    @Test
+    public void shouldClassifySocketTimeoutExceptionAsNonFatal() {
+        AwsServiceException sqsException =
+                SqsException.builder()
+                        .awsErrorDetails(
+                                AwsErrorDetails.builder()
+                                        .errorCode("SocketTimeoutException")
+                                        .build())
+                        .build();
+
+        // isFatal returns `true` if an exception is non-fatal
+        assertTrue(classifier.isFatal(sqsException, ex -> {}));
+    }
+
+    @Test
+    public void shouldClassifyResourceNotFoundAsFatal() {
+        AwsServiceException sqsException = 
ResourceNotFoundException.builder().build();
+
+        // isFatal returns `true` if an exception is non-fatal
+        assertFalse(classifier.isFatal(sqsException, ex -> {}));
+    }
+}
diff --git 
a/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkBuilderTest.java
 
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkBuilderTest.java
new file mode 100644
index 0000000..bc6436a
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkBuilderTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Properties;
+
+/** Covers construction, defaults and sanity checking of {@link 
SqsSinkBuilder}. */
+class SqsSinkBuilderTest {
+
+    @Test
+    void sqsUrlOfSinkMustBeSetWhenBuilt() {
+        Assertions.assertThatExceptionOfType(NullPointerException.class)
+                .isThrownBy(() -> SqsSink.<String>builder().build())
+                .withMessageContaining(
+                        "The sqs url must not be null when initializing the 
SQS Sink.");
+    }
+
+    @Test
+    void sqsUrlOfSinkMustBeSetToNonEmptyWhenBuilt() {
+        Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+                .isThrownBy(
+                        () ->
+                                SqsSink.<String>builder()
+                                        .setSqsUrl("")
+                                        .setSqsClientProperties(new 
Properties())
+                                        .setFailOnError(true)
+                                        .build())
+                .withMessageContaining("The sqs url must be set when 
initializing the SQS Sink.");
+    }
+}
diff --git 
a/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkElementConverterTest.java
 
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkElementConverterTest.java
new file mode 100644
index 0000000..93d0414
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkElementConverterTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+
+import java.nio.charset.StandardCharsets;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Covers construction and sanity checking of {@link 
SqsSinkElementConverter}. */
+class SqsSinkElementConverterTest {
+
+    @Test
+    void 
elementConverterWillComplainASerializationSchemaIsNotSetIfBuildIsCalledWithoutIt()
 {
+        Assertions.assertThatExceptionOfType(NullPointerException.class)
+                .isThrownBy(() -> 
SqsSinkElementConverter.<String>builder().build())
+                .withMessageContaining(
+                        "No SerializationSchema was supplied to the SQS Sink 
builder.");
+    }
+
+    @Test
+    void elementConverterUsesProvidedSchemaToSerializeRecord() {
+        ElementConverter<String, SendMessageBatchRequestEntry> 
elementConverter =
+                SqsSinkElementConverter.<String>builder()
+                        .setSerializationSchema(new OpenCheckingStringSchema())
+                        .build();
+
+        String testString = "{many hands make light work;";
+
+        SendMessageBatchRequestEntry serializedRecord = 
elementConverter.apply(testString, null);
+        byte[] serializedString = (new 
OpenCheckingStringSchema()).serialize(testString);
+        assertThat(serializedRecord.messageBody())
+                .isEqualTo(new String(serializedString, 
StandardCharsets.UTF_8));
+    }
+
+    private static class OpenCheckingStringSchema extends SimpleStringSchema {
+
+        private boolean isOpen = false;
+
+        @Override
+        public void open(SerializationSchema.InitializationContext context) 
throws Exception {
+            super.open(context);
+            isOpen = true;
+        }
+
+        public Boolean isOpen() {
+            return isOpen;
+        }
+    }
+}
diff --git 
a/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkTest.java
 
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkTest.java
new file mode 100644
index 0000000..b0a361c
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+
+import java.util.Properties;
+
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES;
+import static 
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig;
+
+/** Covers construction, defaults and sanity checking of {@link SqsSink}. */
+class SqsSinkTest {
+
+    private static final ElementConverter<String, 
SendMessageBatchRequestEntry> elementConverter =
+            SqsSinkElementConverter.<String>builder()
+                    .setSerializationSchema(new SimpleStringSchema())
+                    .build();
+
+    @Test
+    void sqsUrlMustNotBeNull() {
+        Assertions.assertThatExceptionOfType(NullPointerException.class)
+                .isThrownBy(
+                        () ->
+                                new SqsSink<>(
+                                        elementConverter,
+                                        10,
+                                        50,
+                                        5000,
+                                        256000L,
+                                        5000L,
+                                        256 * 1000L,
+                                        false,
+                                        null,
+                                        new Properties()))
+                .withMessageContaining(
+                        "The sqs url must not be null when initializing the 
SQS Sink.");
+    }
+
+    @Test
+    void sqsUrlMustNotBeEmpty() {
+        Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+                .isThrownBy(
+                        () ->
+                                new SqsSink<>(
+                                        elementConverter,
+                                        500,
+                                        16,
+                                        10000,
+                                        4 * 1024 * 1024L,
+                                        5000L,
+                                        1000 * 1024L,
+                                        false,
+                                        "",
+                                        new Properties()))
+                .withMessageContaining("The sqs url must be set when 
initializing the SQS Sink.");
+    }
+
+    @Test
+    void sqsMaxBatchSizeMustNotBeGreaterThan10() {
+        Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+                .isThrownBy(
+                        () ->
+                                new SqsSink<>(
+                                        elementConverter,
+                                        100,
+                                        50,
+                                        5000,
+                                        256000L,
+                                        5000L,
+                                        256 * 1000L,
+                                        false,
+                                        "testSqlUrl",
+                                        new Properties()))
+                .withMessageContaining("The sqs MaxBatchSize must not be 
greater than 10.");
+    }
+
+    @Test
+    void sqsSinkFailsWhenAccessKeyIdIsNotProvided() {
+        Properties properties = createConfig("https://non-exisitent-location";);
+        properties.setProperty(
+                AWS_CREDENTIALS_PROVIDER, 
AWSConfigConstants.CredentialProvider.BASIC.toString());
+        
properties.remove(AWSConfigConstants.accessKeyId(AWS_CREDENTIALS_PROVIDER));
+        
sqsSinkFailsWithAppropriateMessageWhenInitialConditionsAreMisconfigured(
+                properties, "Please set values for AWS Access Key ID");
+    }
+
+    @Test
+    void sqsSinkFailsWhenRegionIsNotProvided() {
+        Properties properties = createConfig("https://non-exisitent-location";);
+        properties.remove(AWS_REGION);
+        
sqsSinkFailsWithAppropriateMessageWhenInitialConditionsAreMisconfigured(
+                properties, "region must not be null.");
+    }
+
+    @Test
+    void sqsSinkFailsWhenUnableToConnectToRemoteService() {
+        Properties properties = createConfig("https://non-exisitent-location";);
+        properties.remove(TRUST_ALL_CERTIFICATES);
+        
sqsSinkFailsWithAppropriateMessageWhenInitialConditionsAreMisconfigured(
+                properties,
+                "Received an UnknownHostException when attempting to interact 
with a service.");
+    }
+
+    private void 
sqsSinkFailsWithAppropriateMessageWhenInitialConditionsAreMisconfigured(
+            Properties properties, String errorMessage) {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        SqsSink<String> sqsSink =
+                SqsSink.<String>builder()
+                        .setSqsSinkElementConverter(
+                                SqsSinkElementConverter.<String>builder()
+                                        .setSerializationSchema(new 
SimpleStringSchema())
+                                        .build())
+                        .setSqsUrl("sqs-url")
+                        .setMaxBatchSize(5)
+                        .setSqsClientProperties(properties)
+                        .build();
+
+        SqsTestUtils.getSampleDataGenerator(env, 10).sinkTo(sqsSink);
+
+        Assertions.assertThatExceptionOfType(JobExecutionException.class)
+                .isThrownBy(() -> env.execute("Integration Test"))
+                .havingCause()
+                .havingCause()
+                .withMessageContaining(errorMessage);
+    }
+}
diff --git 
a/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkWriterTest.java
 
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkWriterTest.java
new file mode 100644
index 0000000..38c25f9
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkWriterTest.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
+import org.apache.flink.connector.sqs.sink.client.SdkClientProvider;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
+import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.BatchRequestTooLongException;
+import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
+import software.amazon.awssdk.services.sqs.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchResultEntry;
+import software.amazon.awssdk.services.sqs.model.SqsException;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.function.Consumer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+
+/** Covers construction, defaults and sanity checking of {@link 
SqsSinkWriter}. */
+public class SqsSinkWriterTest {
+
+    private SqsSinkWriter<String> sinkWriter;
+
+    private static final ElementConverter<String, SendMessageBatchRequestEntry>
+            ELEMENT_CONVERTER_PLACEHOLDER =
+                    SqsSinkElementConverter.<String>builder()
+                            .setSerializationSchema(new SimpleStringSchema())
+                            .build();
+
+    @Test
+    public void testNonRetryableExceptionWhenFailOnErrorFalseWillNotRetry() 
throws IOException {
+        Optional<Exception> exceptionToThrow = 
getGenericNonRetryableException();
+        ThrowingSqsAsyncClient sqsAsyncClient = new 
ThrowingSqsAsyncClient(exceptionToThrow);
+        TestSqsAsyncClientProvider testSqsAsyncClientProvider =
+                new TestSqsAsyncClientProvider(sqsAsyncClient);
+        sinkWriter = getSqsSinkWriter(false, testSqsAsyncClientProvider);
+        CompletableFuture<List<SendMessageBatchRequestEntry>> failedRequests =
+                new CompletableFuture<>();
+        Consumer<List<SendMessageBatchRequestEntry>> failedRequestConsumer =
+                failedRequests::complete;
+        sinkWriter.submitRequestEntries(getDefaultInputRequests(), 
failedRequestConsumer);
+        assertThat(failedRequests).isNotCompleted();
+    }
+
+    @Test
+    public void testRetryableExceptionWhenFailOnErrorTrueWillNotRetry() throws 
IOException {
+        Optional<Exception> exceptionToThrow = getGenericRetryableException();
+        ThrowingSqsAsyncClient sqsAsyncClient = new 
ThrowingSqsAsyncClient(exceptionToThrow);
+        TestSqsAsyncClientProvider testSqsAsyncClientProvider =
+                new TestSqsAsyncClientProvider(sqsAsyncClient);
+        sinkWriter = getSqsSinkWriter(true, testSqsAsyncClientProvider);
+
+        CompletableFuture<List<SendMessageBatchRequestEntry>> failedRequests =
+                new CompletableFuture<>();
+        Consumer<List<SendMessageBatchRequestEntry>> failedRequestConsumer =
+                failedRequests::complete;
+        sinkWriter.submitRequestEntries(getDefaultInputRequests(), 
failedRequestConsumer);
+        assertThat(failedRequests).isNotCompleted();
+    }
+
+    @Test
+    public void testRetryableExceptionWhenFailOnErrorFalseWillRetry() throws 
IOException {
+        Optional<Exception> exceptionToThrow = getGenericRetryableException();
+        ThrowingSqsAsyncClient sqsAsyncClient = new 
ThrowingSqsAsyncClient(exceptionToThrow);
+        TestSqsAsyncClientProvider testSqsAsyncClientProvider =
+                new TestSqsAsyncClientProvider(sqsAsyncClient);
+        sinkWriter = getSqsSinkWriter(false, testSqsAsyncClientProvider);
+
+        CompletableFuture<List<SendMessageBatchRequestEntry>> failedRequests =
+                new CompletableFuture<>();
+        Consumer<List<SendMessageBatchRequestEntry>> failedRequestConsumer =
+                failedRequests::complete;
+        sinkWriter.submitRequestEntries(getDefaultInputRequests(), 
failedRequestConsumer);
+        assertThat(failedRequests).isCompleted();
+    }
+
+    @Test
+    public void testSubmitRequestEntriesWithNoException() throws IOException {
+        TrackingSqsAsyncClient sqsAsyncClient = new TrackingSqsAsyncClient();
+        TestSqsAsyncClientProvider testSqsAsyncClientProvider =
+                new TestSqsAsyncClientProvider(sqsAsyncClient);
+        sinkWriter = getSqsSinkWriter(false, testSqsAsyncClientProvider);
+
+        CompletableFuture<List<SendMessageBatchRequestEntry>> requests = new 
CompletableFuture<>();
+        Consumer<List<SendMessageBatchRequestEntry>> requestResult = 
requests::complete;
+
+        sinkWriter.submitRequestEntries(getDefaultInputRequests(), 
requestResult);
+
+        List<SendMessageBatchRequestEntry> result = requests.join();
+        Assertions.assertTrue(result.isEmpty());
+    }
+
+    @Test
+    public void 
testSubmitRequestEntriesWithPartialSuccessWithFailOnErrorFalseWillRetry()
+            throws IOException {
+        FailedSqsAsyncClient sqsAsyncClient = new 
FailedSqsAsyncClient("testId2");
+        TestSqsAsyncClientProvider testSqsAsyncClientProvider =
+                new TestSqsAsyncClientProvider(sqsAsyncClient);
+        sinkWriter = getSqsSinkWriter(false, testSqsAsyncClientProvider);
+
+        CompletableFuture<List<SendMessageBatchRequestEntry>> requests = new 
CompletableFuture<>();
+        Consumer<List<SendMessageBatchRequestEntry>> requestResult = 
requests::complete;
+
+        sinkWriter.submitRequestEntries(getDefaultInputRequests(), 
requestResult);
+
+        List<SendMessageBatchRequestEntry> result = requests.join();
+        Assertions.assertEquals(1, result.size());
+    }
+
+    @Test
+    public void
+            
testSubmitRequestEntriesWithPartialSuccessWithFailOnErrorFalseAndFailedIdDoesNotMatchWillNotRetry()
+                    throws IOException {
+        FailedSqsAsyncClient sqsAsyncClient = new 
FailedSqsAsyncClient("invalidFailedRequestId");
+        TestSqsAsyncClientProvider testSqsAsyncClientProvider =
+                new TestSqsAsyncClientProvider(sqsAsyncClient);
+        sinkWriter = getSqsSinkWriter(false, testSqsAsyncClientProvider);
+
+        CompletableFuture<List<SendMessageBatchRequestEntry>> requests = new 
CompletableFuture<>();
+        Consumer<List<SendMessageBatchRequestEntry>> requestResult = 
requests::complete;
+
+        sinkWriter.submitRequestEntries(getDefaultInputRequests(), 
requestResult);
+        assertThat(requests).isNotCompleted();
+    }
+
+    @Test
+    public void 
testSubmitRequestEntriesWithPartialSuccessWithFailOnErrorTrueWillNotRetry()
+            throws IOException {
+
+        FailedSqsAsyncClient sqsAsyncClient = new 
FailedSqsAsyncClient("testId2");
+        TestSqsAsyncClientProvider testSqsAsyncClientProvider =
+                new TestSqsAsyncClientProvider(sqsAsyncClient);
+        sinkWriter = getSqsSinkWriter(true, testSqsAsyncClientProvider);
+
+        CompletableFuture<List<SendMessageBatchRequestEntry>> requests = new 
CompletableFuture<>();
+        Consumer<List<SendMessageBatchRequestEntry>> requestResult = 
requests::complete;
+
+        sinkWriter.submitRequestEntries(getDefaultInputRequests(), 
requestResult);
+        assertThat(requests).isNotCompleted();
+    }
+
+    @Test
+    public void testClientClosesWhenWriterIsClosed() throws IOException {
+        TrackingSqsAsyncClient sqsAsyncClient = new TrackingSqsAsyncClient();
+        TestSqsAsyncClientProvider testSqsAsyncClientProvider =
+                new TestSqsAsyncClientProvider(sqsAsyncClient);
+        sinkWriter = getSqsSinkWriter(false, testSqsAsyncClientProvider);
+        sinkWriter.close();
+        assertThat(testSqsAsyncClientProvider.getCloseCount()).isEqualTo(1);
+    }
+
+    @Test
+    void testGetSizeInBytesReturnsSizeOfBlob() throws IOException {
+        TrackingSqsAsyncClient sqsAsyncClient = new TrackingSqsAsyncClient();
+        TestSqsAsyncClientProvider testSqsAsyncClientProvider =
+                new TestSqsAsyncClientProvider(sqsAsyncClient);
+        sinkWriter = getSqsSinkWriter(false, testSqsAsyncClientProvider);
+
+        String testString = "{many hands make light work;";
+        SendMessageBatchRequestEntry record =
+                
SendMessageBatchRequestEntry.builder().messageBody(testString).build();
+        assertThat(sinkWriter.getSizeInBytes(record))
+                .isEqualTo(testString.getBytes(StandardCharsets.UTF_8).length);
+    }
+
+    @Test
+    void getNumRecordsOutErrorsCounterRecordsCorrectNumberOfFailures()
+            throws IOException, InterruptedException {
+        TestSinkInitContext ctx = new TestSinkInitContext();
+        SqsSink<String> sqsSink =
+                new SqsSink<>(
+                        ELEMENT_CONVERTER_PLACEHOLDER,
+                        10,
+                        16,
+                        10000,
+                        4 * 1024 * 1024L,
+                        5000L,
+                        1000 * 1024L,
+                        true,
+                        "test-stream",
+                        
AWSServicesTestUtils.createConfig("https://localhost";));
+        SinkWriter<String> writer = sqsSink.createWriter(ctx);
+
+        for (int i = 0; i < 12; i++) {
+            writer.write("data_bytes", null);
+        }
+        assertThatExceptionOfType(CompletionException.class)
+                .isThrownBy(() -> writer.flush(true))
+                .withCauseInstanceOf(SdkClientException.class)
+                .withMessageContaining(
+                        "Unable to execute HTTP request: Connection refused: 
localhost/127.0.0.1:443");
+        
assertThat(ctx.metricGroup().getNumRecordsOutErrorsCounter().getCount()).isEqualTo(10);
+        
assertThat(ctx.metricGroup().getNumRecordsSendErrorsCounter().getCount()).isEqualTo(10);
+    }
+
+    private SqsSinkWriter<String> getSqsSinkWriter(
+            final boolean failOnError, TestSqsAsyncClientProvider 
testSqsAsyncClientProvider)
+            throws IOException {
+        TestSinkInitContext sinkInitContext = new TestSinkInitContext();
+        Properties sinkProperties = 
AWSServicesTestUtils.createConfig("https://fake_aws_endpoint";);
+        SqsSink<String> sink =
+                new SqsSink<>(
+                        ELEMENT_CONVERTER_PLACEHOLDER,
+                        10,
+                        16,
+                        10000,
+                        4 * 1024 * 1024L,
+                        5000L,
+                        1000 * 1024L,
+                        failOnError,
+                        
"https://sqs.us-east-2.amazonaws.com/618277569814/fake-sqs";,
+                        sinkProperties);
+        sink.setSqsAsyncClientProvider(testSqsAsyncClientProvider);
+        SqsSinkWriter sqsSinkWriter = (SqsSinkWriter<String>) 
sink.createWriter(sinkInitContext);
+        return sqsSinkWriter;
+    }
+
+    private List<SendMessageBatchRequestEntry> getDefaultInputRequests() {
+        return Arrays.asList(
+                sinkSendMessageBatchRequestEntry("test1", "testId1"),
+                sinkSendMessageBatchRequestEntry("test2", "testId2"));
+    }
+
+    private SendMessageBatchRequestEntry sinkSendMessageBatchRequestEntry(
+            final String messageBody, final String id) {
+        return 
SendMessageBatchRequestEntry.builder().id(id).messageBody(messageBody).build();
+    }
+
+    private Optional<Exception> getGenericRetryableException() {
+        return Optional.of(
+                BatchRequestTooLongException.builder()
+                        .awsErrorDetails(
+                                AwsErrorDetails.builder()
+                                        
.errorCode("SomeErrorCodeThatIsNotUsed")
+                                        .build())
+                        .build());
+    }
+
+    private Optional<Exception> getGenericNonRetryableException() {
+        return Optional.of(
+                ResourceNotFoundException.builder()
+                        .awsErrorDetails(
+                                AwsErrorDetails.builder()
+                                        
.errorCode("SomeErrorCodeThatIsNotUsed")
+                                        .build())
+                        .build());
+    }
+
+    private static class ThrowingSqsAsyncClient<T extends Throwable> 
implements SqsAsyncClient {
+
+        private final Optional<T> errorToReturn;
+
+        private ThrowingSqsAsyncClient(Optional<T> errorToReturn) {
+            this.errorToReturn = errorToReturn;
+        }
+
+        @Override
+        public String serviceName() {
+            return "SQS";
+        }
+
+        @Override
+        public void close() {}
+
+        @Override
+        public CompletableFuture<SendMessageBatchResponse> sendMessageBatch(
+                SendMessageBatchRequest sendMessageBatchRequest) {
+            CompletableFuture<SendMessageBatchResponse> future = new 
CompletableFuture<>();
+            future.completeExceptionally(
+                    
SqsException.builder().cause((errorToReturn.get())).build());
+            return future;
+        }
+    }
+
+    private static class TrackingSqsAsyncClient implements SqsAsyncClient {
+
+        @Override
+        public String serviceName() {
+            return "SQS";
+        }
+
+        @Override
+        public void close() {}
+
+        @Override
+        public CompletableFuture<SendMessageBatchResponse> sendMessageBatch(
+                SendMessageBatchRequest sendMessageBatchRequest) {
+            SendMessageBatchResultEntry sendMessageBatchResultEntry =
+                    SendMessageBatchResultEntry.builder().build();
+            List<SendMessageBatchResultEntry> sendMessageBatchResultEntryList 
= new ArrayList<>();
+            sendMessageBatchResultEntryList.add(sendMessageBatchResultEntry);
+            return CompletableFuture.completedFuture(
+                    SendMessageBatchResponse.builder()
+                            .failed(Collections.emptyList())
+                            .successful(sendMessageBatchResultEntryList)
+                            .build());
+        }
+    }
+
+    private static class FailedSqsAsyncClient implements SqsAsyncClient {
+
+        private final String failedRequestId;
+
+        private FailedSqsAsyncClient(String failedRequestId) {
+            this.failedRequestId = failedRequestId;
+        }
+
+        @Override
+        public String serviceName() {
+            return "SQS";
+        }
+
+        @Override
+        public void close() {}
+
+        @Override
+        public CompletableFuture<SendMessageBatchResponse> sendMessageBatch(
+                SendMessageBatchRequest sendMessageBatchRequest) {
+            BatchResultErrorEntry batchResultErrorEntry =
+                    
BatchResultErrorEntry.builder().id(failedRequestId).build();
+            List<BatchResultErrorEntry> batchResultErrorEntryList = new 
ArrayList<>();
+            batchResultErrorEntryList.add(batchResultErrorEntry);
+
+            SendMessageBatchResultEntry sendMessageBatchResultEntry =
+                    
SendMessageBatchResultEntry.builder().id("testId1").build();
+            List<SendMessageBatchResultEntry> sendMessageBatchResultEntryList 
= new ArrayList<>();
+            sendMessageBatchResultEntryList.add(sendMessageBatchResultEntry);
+            return CompletableFuture.completedFuture(
+                    SendMessageBatchResponse.builder()
+                            .failed(batchResultErrorEntryList)
+                            .successful(sendMessageBatchResultEntryList)
+                            .build());
+        }
+    }
+
+    private static class TestSqsAsyncClientProvider implements 
SdkClientProvider<SqsAsyncClient> {
+
+        private final SqsAsyncClient sqsAsyncClient;
+        private int closeCount = 0;
+
+        private TestSqsAsyncClientProvider(SqsAsyncClient sqsAsyncClient) {
+            this.sqsAsyncClient = sqsAsyncClient;
+        }
+
+        @Override
+        public SqsAsyncClient getClient() {
+            return sqsAsyncClient;
+        }
+
+        @Override
+        public void close() {
+            closeCount++;
+        }
+
+        public int getCloseCount() {
+            return closeCount;
+        }
+    }
+}
diff --git 
a/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsStateSerializerTest.java
 
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsStateSerializerTest.java
new file mode 100644
index 0000000..48b4ff7
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsStateSerializerTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.RequestEntryWrapper;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import static 
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTestUtils.getTestState;
+
+/** Test class for {@link SqsStateSerializer}. */
+class SqsStateSerializerTest {
+
+    private static final ElementConverter<String, 
SendMessageBatchRequestEntry> ELEMENT_CONVERTER =
+            SqsSinkElementConverter.<String>builder()
+                    .setSerializationSchema(new SimpleStringSchema())
+                    .build();
+
+    @Test
+    void testSerializeAndDeserialize() throws IOException {
+        BufferedRequestState<SendMessageBatchRequestEntry> expectedState =
+                getTestState(ELEMENT_CONVERTER, this::getRequestSize);
+
+        SqsStateSerializer serializer = new SqsStateSerializer();
+
+        BufferedRequestState<SendMessageBatchRequestEntry> actualState =
+                serializer.deserialize(1, serializer.serialize(expectedState));
+
+        assertThatBufferStatesAreEqual(actualState, expectedState);
+    }
+
+    private int getRequestSize(SendMessageBatchRequestEntry requestEntry) {
+        return requestEntry.messageBody().length();
+    }
+
+    private <T extends Serializable> void assertThatBufferStatesAreEqual(
+            BufferedRequestState<SendMessageBatchRequestEntry> actual,
+            BufferedRequestState<SendMessageBatchRequestEntry> expected) {
+        
Assertions.assertThat(actual.getStateSize()).isEqualTo(expected.getStateSize());
+        int actualLength = actual.getBufferedRequestEntries().size();
+        
Assertions.assertThat(actualLength).isEqualTo(expected.getBufferedRequestEntries().size());
+        List<RequestEntryWrapper<SendMessageBatchRequestEntry>> actualRequests 
=
+                actual.getBufferedRequestEntries();
+        List<RequestEntryWrapper<SendMessageBatchRequestEntry>> 
expectedRequests =
+                expected.getBufferedRequestEntries();
+
+        for (int i = 0; i < actualLength; ++i) {
+            
Assertions.assertThat((actualRequests.get(i)).getRequestEntry().messageBody())
+                    
.isEqualTo((expectedRequests.get(i)).getRequestEntry().messageBody());
+            
Assertions.assertThat((actualRequests.get(i)).getRequestEntry().id())
+                    
.isEqualTo((expectedRequests.get(i)).getRequestEntry().id());
+            
Assertions.assertThat((actualRequests.get(i)).getRequestEntry().id()).isNotNull();
+        }
+    }
+}
diff --git 
a/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/testutils/SqsTestUtils.java
 
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/testutils/SqsTestUtils.java
new file mode 100644
index 0000000..d7e69b1
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/testutils/SqsTestUtils.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink.testutils;
+
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
+import software.amazon.awssdk.utils.ImmutableMap;
+
+/**
+ * A set of static methods that can be used to call common AWS services on the 
Localstack container.
+ */
+public class SqsTestUtils {
+
+    private static final ObjectMapper MAPPER = createObjectMapper();
+
+    public static SqsClient createSqsClient(String endpoint, SdkHttpClient 
httpClient) {
+        return AWSServicesTestUtils.createAwsSyncClient(endpoint, httpClient, 
SqsClient.builder());
+    }
+
+    public static DataStream<String> getSampleDataGenerator(
+            StreamExecutionEnvironment env, int endValue) {
+        return env.fromSequence(1, endValue)
+                .map(Object::toString)
+                .returns(String.class)
+                .map(data -> MAPPER.writeValueAsString(ImmutableMap.of("data", 
data)));
+    }
+
+    public static void createSqs(String sqsName, SqsClient sqsClient) {
+        CreateQueueRequest createQueueRequest =
+                CreateQueueRequest.builder().queueName(sqsName).build();
+
+        sqsClient.createQueue(createQueueRequest);
+    }
+
+    private static ObjectMapper createObjectMapper() {
+        ObjectMapper objectMapper = new ObjectMapper();
+        return objectMapper;
+    }
+}
diff --git 
a/flink-connector-aws/flink-connector-sqs/src/test/resources/archunit.properties
 
b/flink-connector-aws/flink-connector-sqs/src/test/resources/archunit.properties
new file mode 100644
index 0000000..15be88c
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-sqs/src/test/resources/archunit.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# By default we allow removing existing violations, but fail when new 
violations are added.
+freeze.store.default.allowStoreUpdate=true
+
+# Enable this if a new (frozen) rule has been added in order to create the 
initial store and record the existing violations.
+#freeze.store.default.allowStoreCreation=true
+
+# Enable this to add allow new violations to be recorded.
+# NOTE: Adding new violations should be avoided when possible. If the rule was 
correct to flag a new
+#       violation, please try to avoid creating the violation. If the 
violation was created due to a
+#       shortcoming of the rule, file a JIRA issue so the rule can be improved.
+#freeze.refreeze=true
+
+freeze.store.default.path=archunit-violations
diff --git 
a/flink-connector-aws/flink-connector-sqs/src/test/resources/log4j2-test.properties
 
b/flink-connector-aws/flink-connector-sqs/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..c4fa187
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-sqs/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-connector-aws/pom.xml b/flink-connector-aws/pom.xml
index 4ac5b80..678a279 100644
--- a/flink-connector-aws/pom.xml
+++ b/flink-connector-aws/pom.xml
@@ -18,8 +18,8 @@ specific language governing permissions and limitations
 under the License.
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0";
-                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+                 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
 
     <modelVersion>4.0.0</modelVersion>
 
@@ -38,6 +38,7 @@ under the License.
         <module>flink-connector-aws-kinesis-firehose</module>
         <module>flink-connector-aws-kinesis-streams</module>
         <module>flink-connector-kinesis</module>
+        <module>flink-connector-sqs</module>
 
         <module>flink-sql-connector-dynamodb</module>
         <module>flink-sql-connector-aws-kinesis-firehose</module>
@@ -45,4 +46,4 @@ under the License.
         <module>flink-sql-connector-kinesis</module>
     </modules>
 
-</project>
+</project>
\ No newline at end of file

Reply via email to