This is an automated email from the ASF dual-hosted git repository.
hong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
The following commit(s) were added to refs/heads/main by this push:
new cc0b4d4 [FLINK-35305][Connector/SQS] Amazon SQS Sink Connector
cc0b4d4 is described below
commit cc0b4d4b8e00feabe4e09cd9a24c8389d5bda04e
Author: Priya Dhingra <[email protected]>
AuthorDate: Fri May 10 14:30:50 2024 -0700
[FLINK-35305][Connector/SQS] Amazon SQS Sink Connector
---
docs/content.zh/docs/connectors/datastream/sqs.md | 134 +++++++
docs/content/docs/connectors/datastream/sqs.md | 132 +++++++
.../flink-connector-aws-sqs-e2e-tests/pom.xml | 108 ++++++
.../connector/sqs/sink/test/SqsSinkITTest.java | 161 +++++++++
.../src/test/resources/log4j2-test.properties | 28 ++
flink-connector-aws-e2e-tests/pom.xml | 1 +
.../54da9a7d-14d2-4632-a045-1dd8fc665c8f | 0
.../a6cbd99c-b115-447a-8f19-43c1094db549 | 0
.../archunit-violations/stored.rules | 4 +
flink-connector-aws/flink-connector-sqs/pom.xml | 136 +++++++
.../connector/sqs/sink/SqsConfigConstants.java | 39 ++
.../sqs/sink/SqsExceptionClassifiers.java | 61 ++++
.../apache/flink/connector/sqs/sink/SqsSink.java | 158 ++++++++
.../flink/connector/sqs/sink/SqsSinkBuilder.java | 160 +++++++++
.../sqs/sink/SqsSinkElementConverter.java | 89 +++++
.../flink/connector/sqs/sink/SqsSinkException.java | 59 +++
.../flink/connector/sqs/sink/SqsSinkWriter.java | 235 ++++++++++++
.../connector/sqs/sink/SqsStateSerializer.java | 70 ++++
.../sqs/sink/client/SdkClientProvider.java | 36 ++
.../sqs/sink/client/SqsAsyncClientProvider.java | 64 ++++
.../src/main/resources/log4j2.properties | 25 ++
.../architecture/TestCodeArchitectureTest.java | 40 +++
.../sqs/sink/SqsExceptionClassifiersTest.java | 87 +++++
.../connector/sqs/sink/SqsSinkBuilderTest.java | 48 +++
.../sqs/sink/SqsSinkElementConverterTest.java | 72 ++++
.../flink/connector/sqs/sink/SqsSinkTest.java | 153 ++++++++
.../connector/sqs/sink/SqsSinkWriterTest.java | 396 +++++++++++++++++++++
.../connector/sqs/sink/SqsStateSerializerTest.java | 80 +++++
.../connector/sqs/sink/testutils/SqsTestUtils.java | 60 ++++
.../src/test/resources/archunit.properties | 31 ++
.../src/test/resources/log4j2-test.properties | 28 ++
flink-connector-aws/pom.xml | 7 +-
32 files changed, 2699 insertions(+), 3 deletions(-)
diff --git a/docs/content.zh/docs/connectors/datastream/sqs.md
b/docs/content.zh/docs/connectors/datastream/sqs.md
new file mode 100644
index 0000000..06c99db
--- /dev/null
+++ b/docs/content.zh/docs/connectors/datastream/sqs.md
@@ -0,0 +1,134 @@
+---
+title: SQS
+weight: 5
+type: docs
+aliases:
+ - /zh/dev/connectors/sqs.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Amazon SQS Sink
+
+The SQS sink writes to [Amazon SQS](https://aws.amazon.com/sqs) using the [AWS
v2 SDK for
Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html).
Follow the instructions from the [Amazon SQS Developer
Guide](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html)
+to setup a SQS message queue.
+
+To use the connector, add the following Maven dependency to your project:
+
+{{< connector_artifact flink-connector-sqs sqs >}}
+
+{{< tabs "ec24a4ae-6a47-11ed-a1eb-0242ac120002" >}}
+{{< tab "Java" >}}
+```java
+Properties sinkProperties = new Properties();
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+// Optional, provide via alternative routes e.g. environment variables
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
"aws_secret_access_key");
+
+// Optional, use following if you want to provide access via AssumeRole,
Please make sure given IAM role has "sqs:SendMessage" permission
+sinkProperties.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
"ASSUME_ROLE");
+sinkProperties.setProperty(AWSConfigConstants.AWS_ROLE_ARN,
"replace-this-with-IAMRole-arn");
+sinkProperties.setProperty(AWSConfigConstants.AWS_ROLE_SESSION_NAME,
"any-session-name-string");
+
+SqsSink<String> sqsSink =
+ SqsSink.<String>builder()
+ .setSerializationSchema(new SimpleStringSchema())
// Required
+
.setSqsUrl("https://sqs.us-east-1.amazonaws.com/xxxx/test-sqs") // Required
+ .setSqsClientProperties(sinkProperties)
// Required
+ .setFailOnError(false)
// Optional
+ .setMaxBatchSize(10)
// Optional
+ .setMaxInFlightRequests(50)
// Optional
+ .setMaxBufferedRequests(1_000)
// Optional
+ .setMaxBatchSizeInBytes(256 * 1024) //
Optional
+ .setMaxTimeInBufferMS(5000)
// Optional
+ .setMaxRecordSizeInBytes(256 * 1024)
// Optional
+ .build();
+
+flinkStream.sinkTo(sqsSink)
+
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val sinkProperties = new Properties()
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1")
+// Optional, provide via alternative routes e.g. environment variables
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
"aws_secret_access_key")
+
+val SqsSink<String> sqsSink =
+ SqsSink.<String>builder()
+ .setSerializationSchema(new SimpleStringSchema())
// Required
+
.setSqsUrl("https://sqs.us-east-1.amazonaws.com/xxxx/test-sqs") // Required
+ .setSqsClientProperties(sinkProperties)
// Required
+ .setFailOnError(false)
// Optional
+ .setMaxBatchSize(10)
// Optional
+ .setMaxInFlightRequests(50)
// Optional
+ .setMaxBufferedRequests(1_000)
// Optional
+ .setMaxBatchSizeInBytes(256 * 1024)
// Optional
+ .setMaxTimeInBufferMS(5000)
// Optional
+ .setMaxRecordSizeInBytes(256 * 1024)
// Optional
+ .build();
+
+
+flinkStream.sinkTo(sqsSink)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+## Configurations
+
+Flink's SQS sink is created by using the static builder
`SqsSink.<String>builder()`.
+
+1. __setSqsClientProperties(Properties sinkProperties)__
+ * Required.
+ * Supplies credentials, region and other parameters to the SQS client.
+2. __setSerializationSchema(SerializationSchema<InputType>
serializationSchema)__
+ * Required.
+ * Supplies a serialization schema to the Sink. This schema is used to
serialize elements before sending to SQS.
+3. __setSqsUrl(String sqsUrl)__
+ * Required.
+ * Url of the SQS to sink to.
+4. _setFailOnError(boolean failOnError)_
+ * Optional. Default: `false`.
+ * Whether failed requests to write records to SQS are treated as fatal
exceptions in the sink that cause a Flink Job to restart
+5. _setMaxBatchSize(int maxBatchSize)_
+ * Optional. Default: `10`.
+ * Maximum size of a batch to write to SQS.
+6. _setMaxInFlightRequests(int maxInFlightRequests)_
+ * Optional. Default: `50`.
+ * The maximum number of in flight requests allowed before the sink applies
backpressure.
+7. _setMaxBufferedRequests(int maxBufferedRequests)_
+ * Optional. Default: `5_000`.
+ * The maximum number of records that may be buffered in the sink before
backpressure is applied.
+8. _setMaxBatchSizeInBytes(int maxBatchSizeInBytes)_
+ * Optional. Default: `256 * 1024`.
+ * The maximum size (in bytes) a batch may become. All batches sent will be
smaller than or equal to this size.
+9. _setMaxTimeInBufferMS(int maxTimeInBufferMS)_
+ * Optional. Default: `5000`.
+ * The maximum time a record may stay in the sink before being flushed.
+10. _setMaxRecordSizeInBytes(int maxRecordSizeInBytes)_
+ * Optional. Default: `256 * 1024`.
+ * The maximum record size that the sink will accept, records larger than
this will be automatically rejected.
+11. _build()_
+ * Constructs and returns the SQS sink.
+
diff --git a/docs/content/docs/connectors/datastream/sqs.md
b/docs/content/docs/connectors/datastream/sqs.md
new file mode 100644
index 0000000..a3648f9
--- /dev/null
+++ b/docs/content/docs/connectors/datastream/sqs.md
@@ -0,0 +1,132 @@
+---
+title: SQS
+weight: 5
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Amazon SQS Sink
+
+The SQS sink writes to [Amazon SQS](https://aws.amazon.com/sqs) using the [AWS
v2 SDK for
Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html).
Follow the instructions from the [Amazon SQS Developer
Guide](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html)
+to setup a SQS message queue.
+
+To use the connector, add the following Maven dependency to your project:
+
+{{< connector_artifact flink-connector-sqs sqs >}}
+
+{{< tabs "ec24a4ae-6a47-11ed-a1eb-0242ac120002" >}}
+{{< tab "Java" >}}
+```java
+Properties sinkProperties = new Properties();
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+// Optional, provide via alternative routes e.g. environment variables
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
"aws_secret_access_key");
+
+// Optional, use following if you want to provide access via AssumeRole,
Please make sure given IAM role has "sqs:SendMessage" permission
+sinkProperties.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
"ASSUME_ROLE");
+sinkProperties.setProperty(AWSConfigConstants.AWS_ROLE_ARN,
"replace-this-with-IAMRole-arn");
+sinkProperties.setProperty(AWSConfigConstants.AWS_ROLE_SESSION_NAME,
"any-session-name-string");
+
+SqsSink<String> sqsSink =
+ SqsSink.<String>builder()
+ .setSerializationSchema(new SimpleStringSchema())
// Required
+
.setSqsUrl("https://sqs.us-east-1.amazonaws.com/xxxx/test-sqs") // Required
+ .setSqsClientProperties(sinkProperties)
// Required
+ .setFailOnError(false)
// Optional
+ .setMaxBatchSize(10)
// Optional
+ .setMaxInFlightRequests(50)
// Optional
+ .setMaxBufferedRequests(1_000)
// Optional
+ .setMaxBatchSizeInBytes(256 * 1024) //
Optional
+ .setMaxTimeInBufferMS(5000)
// Optional
+ .setMaxRecordSizeInBytes(256 * 1024)
// Optional
+ .build();
+
+flinkStream.sinkTo(sqsSink)
+
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val sinkProperties = new Properties()
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1")
+// Optional, provide via alternative routes e.g. environment variables
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
"aws_secret_access_key")
+
+val SqsSink<String> sqsSink =
+ SqsSink.<String>builder()
+ .setSerializationSchema(new SimpleStringSchema())
// Required
+
.setSqsUrl("https://sqs.us-east-1.amazonaws.com/xxxx/test-sqs") // Required
+ .setSqsClientProperties(sinkProperties)
// Required
+ .setFailOnError(false)
// Optional
+ .setMaxBatchSize(10)
// Optional
+ .setMaxInFlightRequests(50)
// Optional
+ .setMaxBufferedRequests(1_000)
// Optional
+ .setMaxBatchSizeInBytes(256 * 1024)
// Optional
+ .setMaxTimeInBufferMS(5000)
// Optional
+ .setMaxRecordSizeInBytes(256 * 1024)
// Optional
+ .build();
+
+
+flinkStream.sinkTo(sqsSink)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+## Configurations
+
+Flink's SQS sink is created by using the static builder
`SqsSink.<String>builder()`.
+
+1. __setSqsClientProperties(Properties sinkProperties)__
+ * Required.
+ * Supplies credentials, region and other parameters to the SQS client.
+2. __setSerializationSchema(SerializationSchema<InputType>
serializationSchema)__
+ * Required.
+ * Supplies a serialization schema to the Sink. This schema is used to
serialize elements before sending to SQS.
+3. __setSqsUrl(String sqsUrl)__
+ * Required.
+ * Url of the SQS to sink to.
+4. _setFailOnError(boolean failOnError)_
+ * Optional. Default: `false`.
+ * Whether failed requests to write records to SQS are treated as fatal
exceptions in the sink that cause a Flink Job to restart
+5. _setMaxBatchSize(int maxBatchSize)_
+ * Optional. Default: `10`.
+ * Maximum size of a batch to write to SQS.
+6. _setMaxInFlightRequests(int maxInFlightRequests)_
+ * Optional. Default: `50`.
+ * The maximum number of in flight requests allowed before the sink applies
backpressure.
+7. _setMaxBufferedRequests(int maxBufferedRequests)_
+ * Optional. Default: `5_000`.
+ * The maximum number of records that may be buffered in the sink before
backpressure is applied.
+8. _setMaxBatchSizeInBytes(int maxBatchSizeInBytes)_
+ * Optional. Default: `256 * 1024`.
+ * The maximum size (in bytes) a batch may become. All batches sent will be
smaller than or equal to this size.
+9. _setMaxTimeInBufferMS(int maxTimeInBufferMS)_
+ * Optional. Default: `5000`.
+ * The maximum time a record may stay in the sink before being flushed.
+10. _setMaxRecordSizeInBytes(int maxRecordSizeInBytes)_
+* Optional. Default: `256 * 1024`.
+* The maximum record size that the sink will accept, records larger than this
will be automatically rejected.
+11. _build()_
+* Constructs and returns the SQS sink.
+
diff --git
a/flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml
b/flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml
new file mode 100644
index 0000000..3259a6f
--- /dev/null
+++ b/flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml
@@ -0,0 +1,108 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <parent>
+ <artifactId>flink-connector-aws-e2e-tests-parent</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>4.4-SNAPSHOT</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>flink-connector-aws-sqs-e2e-tests</artifactId>
+ <name>Flink : Connectors : AWS : E2E Tests : Amazon SQS</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-sqs</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-aws-base</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-sqs</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <!-- Other third-party dependencies -->
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.datatype</groupId>
+ <artifactId>jackson-datatype-jsr310</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>s3</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <dependencyManagement>
+ <dependencies>
+
+ <!-- Overridden aws-sdk dependency to older version to temporarily
fix 'not able to create sqs localstack error with newer version'-->
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>bom</artifactId>
+ <version>2.20.144</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+</project>
diff --git
a/flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/java/org/apache/flink/connector/sqs/sink/test/SqsSinkITTest.java
b/flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/java/org/apache/flink/connector/sqs/sink/test/SqsSinkITTest.java
new file mode 100644
index 0000000..97a4e16
--- /dev/null
+++
b/flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/java/org/apache/flink/connector/sqs/sink/test/SqsSinkITTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink.test;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.aws.testutils.LocalstackContainer;
+import org.apache.flink.connector.sqs.sink.SqsSink;
+import org.apache.flink.connector.sqs.sink.SqsSinkElementConverter;
+import org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils;
+import org.apache.flink.connector.testframe.container.FlinkContainers;
+import org.apache.flink.connector.testframe.container.TestcontainersSettings;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.TestLogger;
+
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.core.SdkSystemSetting;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.Message;
+import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig;
+import static
org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils.createSqsClient;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** End to End test for SQS sink API. */
+public class SqsSinkITTest extends TestLogger {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SqsSinkITTest.class);
+
+ private static final int NUMBER_OF_ELEMENTS = 50;
+ private StreamExecutionEnvironment env;
+ private SdkHttpClient httpClient;
+ private SqsClient sqsClient;
+ private static final Network network = Network.newNetwork();
+
+ @ClassRule
+ public static LocalstackContainer mockSqsContainer =
+ new
LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK))
+ .withNetwork(network)
+ .withNetworkAliases("localstack");
+
+ public static final TestcontainersSettings TESTCONTAINERS_SETTINGS =
+ TestcontainersSettings.builder()
+ .environmentVariable("AWS_CBOR_DISABLE", "1")
+ .environmentVariable(
+ "FLINK_ENV_JAVA_OPTS",
+
"-Dorg.apache.flink.sqs.shaded.com.amazonaws.sdk.disableCertChecking
-Daws.cborEnabled=false")
+ .network(network)
+ .logger(LOG)
+ .dependsOn(mockSqsContainer)
+ .build();
+
+ public static final FlinkContainers FLINK =
+
FlinkContainers.builder().withTestcontainersSettings(TESTCONTAINERS_SETTINGS).build();
+
+ @Before
+ public void setup() throws Exception {
+ httpClient = AWSServicesTestUtils.createHttpClient();
+ sqsClient = createSqsClient(mockSqsContainer.getEndpoint(),
httpClient);
+ env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ LOG.info("Done setting up the localstack.");
+ }
+
+ @BeforeClass
+ public static void setupFlink() throws Exception {
+ FLINK.start();
+ }
+
+ @AfterClass
+ public static void stopFlink() {
+ FLINK.stop();
+ }
+
+ @After
+ public void teardown() {
+ System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property());
+ httpClient.close();
+ sqsClient.close();
+ }
+
+ @Test
+ public void sqsSinkWritesCorrectDataToMockAWSServices() throws Exception {
+ LOG.info("1 - Creating the SQS");
+ SqsTestUtils.createSqs("test-sqs", sqsClient);
+
+ SqsSink<String> sqsSink =
+ SqsSink.<String>builder()
+ .setSqsSinkElementConverter(
+ SqsSinkElementConverter.<String>builder()
+ .setSerializationSchema(new
SimpleStringSchema())
+ .build())
+ .setSqsUrl("http://localhost:4576/queue/test-sqs")
+
.setSqsClientProperties(createConfig(mockSqsContainer.getEndpoint()))
+ .build();
+
+ SqsTestUtils.getSampleDataGenerator(env,
NUMBER_OF_ELEMENTS).sinkTo(sqsSink);
+ env.execute("Integration Test");
+ List<Message> messages = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ // Read data from SQS and validate
+ ReceiveMessageRequest receiveMessageRequest =
+ ReceiveMessageRequest.builder()
+ .queueUrl("http://localhost:4576/queue/test-sqs")
+ .maxNumberOfMessages(10) // max 10 can be read at
a time
+ .build();
+
+
messages.addAll(sqsClient.receiveMessage(receiveMessageRequest).messages());
+ }
+
+ // Add assertions here to validate the messages
+ assertEquals(
+ NUMBER_OF_ELEMENTS,
+ messages.size(),
+ "Number of messages received should match the number of
elements sent");
+
+ List<String> sentDataList = new ArrayList<>();
+ SqsTestUtils.getSampleDataGenerator(env, NUMBER_OF_ELEMENTS)
+ .executeAndCollect()
+ .forEachRemaining(sentDataList::add);
+
+ List<String> receivedDataList = new ArrayList<>();
+ for (Message message : messages) {
+ receivedDataList.add(new String(message.body()));
+ }
+
+
Assertions.assertThat(sentDataList.containsAll(receivedDataList)).isTrue();
+ }
+}
diff --git
a/flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/resources/log4j2-test.properties
b/flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..835c2ec
--- /dev/null
+++
b/flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-connector-aws-e2e-tests/pom.xml
b/flink-connector-aws-e2e-tests/pom.xml
index e291d3d..ee59f73 100644
--- a/flink-connector-aws-e2e-tests/pom.xml
+++ b/flink-connector-aws-e2e-tests/pom.xml
@@ -41,6 +41,7 @@ under the License.
<module>flink-connector-aws-kinesis-firehose-e2e-tests</module>
<module>flink-connector-aws-kinesis-streams-e2e-tests</module>
<module>flink-connector-kinesis-e2e-tests</module>
+ <module>flink-connector-aws-sqs-e2e-tests</module>
<module>flink-formats-avro-glue-schema-registry-e2e-tests</module>
<module>flink-formats-json-glue-schema-registry-e2e-tests</module>
</modules>
diff --git
a/flink-connector-aws/flink-connector-sqs/archunit-violations/54da9a7d-14d2-4632-a045-1dd8fc665c8f
b/flink-connector-aws/flink-connector-sqs/archunit-violations/54da9a7d-14d2-4632-a045-1dd8fc665c8f
new file mode 100644
index 0000000..e69de29
diff --git
a/flink-connector-aws/flink-connector-sqs/archunit-violations/a6cbd99c-b115-447a-8f19-43c1094db549
b/flink-connector-aws/flink-connector-sqs/archunit-violations/a6cbd99c-b115-447a-8f19-43c1094db549
new file mode 100644
index 0000000..e69de29
diff --git
a/flink-connector-aws/flink-connector-sqs/archunit-violations/stored.rules
b/flink-connector-aws/flink-connector-sqs/archunit-violations/stored.rules
new file mode 100644
index 0000000..cf8b667
--- /dev/null
+++ b/flink-connector-aws/flink-connector-sqs/archunit-violations/stored.rules
@@ -0,0 +1,4 @@
+#
+#Tue Feb 22 12:19:26 CET 2022
+Tests\ inheriting\ from\ AbstractTestBase\ should\ have\ name\ ending\ with\
ITCase=54da9a7d-14d2-4632-a045-1dd8fc665c8f
+ITCASE\ tests\ should\ use\ a\ MiniCluster\ resource\ or\
extension=a6cbd99c-b115-447a-8f19-43c1094db549
diff --git a/flink-connector-aws/flink-connector-sqs/pom.xml
b/flink-connector-aws/flink-connector-sqs/pom.xml
new file mode 100644
index 0000000..00e3c29
--- /dev/null
+++ b/flink-connector-aws/flink-connector-sqs/pom.xml
@@ -0,0 +1,136 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-aws-parent</artifactId>
+ <version>4.4-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>flink-connector-sqs</artifactId>
+ <name>Flink : Connectors : AWS : Amazon SQS</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-aws-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>sqs</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>netty-nio-client</artifactId>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-test-utils</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-aws-base</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-base</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.datatype</groupId>
+ <artifactId>jackson-datatype-jsr310</artifactId>
+ </dependency>
+
+ <!-- ArchUnit test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-architecture-tests-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>s3</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsConfigConstants.java
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsConfigConstants.java
new file mode 100644
index 0000000..61e5eb6
--- /dev/null
+++
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsConfigConstants.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/** Defaults for {@link SqsSinkWriter}. */
+@PublicEvolving
+public class SqsConfigConstants {
+
+ public static final ConfigOption<String> BASE_SQS_USER_AGENT_PREFIX_FORMAT
=
+ ConfigOptions.key("Apache Flink %s (%s) SQS Connector")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("SQS useragent prefix format.");
+
+ public static final ConfigOption<String> SQS_CLIENT_USER_AGENT_PREFIX =
+ ConfigOptions.key("aws.sqs.client.user-agent-prefix")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("SQS identifier for user agent prefix.");
+}
diff --git
a/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsExceptionClassifiers.java
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsExceptionClassifiers.java
new file mode 100644
index 0000000..7f6d39e
--- /dev/null
+++
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsExceptionClassifiers.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import
org.apache.flink.connector.aws.sink.throwable.AWSExceptionClassifierUtil;
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+
+import software.amazon.awssdk.services.sqs.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.sqs.model.SqsException;
+
+/**
+ * Class containing set of {@link FatalExceptionClassifier} for {@link
+ * software.amazon.awssdk.services.sqs.model.SqsException}.
+ */
+@Internal
+public class SqsExceptionClassifiers {
+
+ public static FatalExceptionClassifier
getNotAuthorizedExceptionClassifier() {
+ return AWSExceptionClassifierUtil.withAWSServiceErrorCode(
+ SqsException.class,
+ "NotAuthorized",
+ err ->
+ new SqsSinkException(
+ "Encountered non-recoverable exception:
NotAuthorized", err));
+ }
+
+ public static FatalExceptionClassifier
getAccessDeniedExceptionClassifier() {
+ return AWSExceptionClassifierUtil.withAWSServiceErrorCode(
+ SqsException.class,
+ "AccessDeniedException",
+ err ->
+ new SqsSinkException(
+ "Encountered non-recoverable exception:
AccessDeniedException",
+ err));
+ }
+
+ public static FatalExceptionClassifier
getResourceNotFoundExceptionClassifier() {
+ return FatalExceptionClassifier.withRootCauseOfType(
+ ResourceNotFoundException.class,
+ err ->
+ new SqsSinkException(
+ "Encountered non-recoverable exception
relating to not being able to find the specified resources",
+ err));
+ }
+}
diff --git
a/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSink.java
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSink.java
new file mode 100644
index 0000000..eea6109
--- /dev/null
+++
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSink.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.base.sink.AsyncSinkBase;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.sqs.sink.client.SdkClientProvider;
+import org.apache.flink.connector.sqs.sink.client.SqsAsyncClientProvider;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Properties;
+
+/**
+ * A SQS Sink that performs async requests against a destination SQS using the
buffering protocol
+ * specified in {@link AsyncSinkBase}.
+ *
+ * <p>The sink internally uses a {@link
software.amazon.awssdk.services.sqs.SqsAsyncClient} to
+ * communicate with the AWS endpoint.
+ *
+ * <p>Please see the writer implementation in {@link SqsSinkWriter}
+ *
+ * @param <InputT> Type of the elements handled by this sink
+ */
+@PublicEvolving
+public class SqsSink<InputT> extends AsyncSinkBase<InputT,
SendMessageBatchRequestEntry> {
+
+ private final boolean failOnError;
+ private final String sqsUrl;
+ private final Properties sqsClientProperties;
+ private transient SdkClientProvider<SqsAsyncClient>
asyncClientSdkClientProviderOverride;
+
+ SqsSink(
+ ElementConverter<InputT, SendMessageBatchRequestEntry>
elementConverter,
+ Integer maxBatchSize,
+ Integer maxInFlightRequests,
+ Integer maxBufferedRequests,
+ Long maxBatchSizeInBytes,
+ Long maxTimeInBufferMS,
+ Long maxRecordSizeInBytes,
+ boolean failOnError,
+ String sqsUrl,
+ Properties sqsClientProperties) {
+ super(
+ elementConverter,
+ maxBatchSize,
+ maxInFlightRequests,
+ maxBufferedRequests,
+ maxBatchSizeInBytes,
+ maxTimeInBufferMS,
+ maxRecordSizeInBytes);
+ this.sqsUrl =
+ Preconditions.checkNotNull(
+ sqsUrl, "The sqs url must not be null when
initializing the SQS Sink.");
+ Preconditions.checkArgument(
+ !this.sqsUrl.isEmpty(), "The sqs url must be set when
initializing the SQS Sink.");
+
+ Preconditions.checkArgument(
+ (this.getMaxBatchSize() <= 10),
+ "The sqs MaxBatchSize must not be greater than 10.");
+ this.failOnError = failOnError;
+ this.sqsClientProperties = sqsClientProperties;
+ }
+
+ /**
+ * Create a {@link SqsSinkBuilder} to allow the fluent construction of a
new {@code SqsSink}.
+ *
+ * @param <InputT> type of incoming records
+ * @return {@link SqsSinkBuilder}
+ */
+ public static <InputT> SqsSinkBuilder<InputT> builder() {
+ return new SqsSinkBuilder<>();
+ }
+
+ @Override
+ public StatefulSinkWriter<InputT,
BufferedRequestState<SendMessageBatchRequestEntry>>
+ createWriter(InitContext context) throws IOException {
+ return new SqsSinkWriter<>(
+ getElementConverter(),
+ context,
+ getMaxBatchSize(),
+ getMaxInFlightRequests(),
+ getMaxBufferedRequests(),
+ getMaxBatchSizeInBytes(),
+ getMaxTimeInBufferMS(),
+ getMaxRecordSizeInBytes(),
+ failOnError,
+ sqsUrl,
+ getAsyncClientProvider(sqsClientProperties),
+ Collections.emptyList());
+ }
+
+ @Override
+ public StatefulSinkWriter<InputT,
BufferedRequestState<SendMessageBatchRequestEntry>>
+ restoreWriter(
+ InitContext context,
+
Collection<BufferedRequestState<SendMessageBatchRequestEntry>> recoveredState)
+ throws IOException {
+ return new SqsSinkWriter<>(
+ getElementConverter(),
+ context,
+ getMaxBatchSize(),
+ getMaxInFlightRequests(),
+ getMaxBufferedRequests(),
+ getMaxBatchSizeInBytes(),
+ getMaxTimeInBufferMS(),
+ getMaxRecordSizeInBytes(),
+ failOnError,
+ sqsUrl,
+ getAsyncClientProvider(sqsClientProperties),
+ recoveredState);
+ }
+
+ private SdkClientProvider<SqsAsyncClient>
getAsyncClientProvider(Properties clientProperties) {
+ if (asyncClientSdkClientProviderOverride != null) {
+ return asyncClientSdkClientProviderOverride;
+ }
+ return new SqsAsyncClientProvider(clientProperties);
+ }
+
+ @Internal
+ @VisibleForTesting
+ void setSqsAsyncClientProvider(
+ SdkClientProvider<SqsAsyncClient>
asyncClientSdkClientProviderOverride) {
+ this.asyncClientSdkClientProviderOverride =
asyncClientSdkClientProviderOverride;
+ }
+
+ @Override
+ public
SimpleVersionedSerializer<BufferedRequestState<SendMessageBatchRequestEntry>>
+ getWriterStateSerializer() {
+ return new SqsStateSerializer();
+ }
+}
diff --git
a/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkBuilder.java
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkBuilder.java
new file mode 100644
index 0000000..9db622c
--- /dev/null
+++
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkBuilder.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
+
+import software.amazon.awssdk.http.Protocol;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+
+import java.util.Optional;
+import java.util.Properties;
+
+import static
org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION;
+import static software.amazon.awssdk.http.Protocol.HTTP1_1;
+
+/**
+ * Builder to construct {@link SqsSink}.
+ *
+ * <p>The following example shows the minimum setup to create a {@link
SqsSink} that writes String
+ * values to a SQS named sqsUrl.
+ *
+ * <pre>{@code
+ * Properties sinkProperties = new Properties();
+ * sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+ *
+ * SqsSink<String> sqsSink =
+ * SqsSink.<String>builder()
+ * .setSqsUrl("sqsUrl")
+ * .setSqsClientProperties(sinkProperties)
+ *
.setSqsSinkElementConverter(SqsSinkElementConverter.<String>builder()
+ * .setSerializationSchema(new
SimpleStringSchema())
+ * .build())
+ * .build();
+ * }</pre>
+ *
+ * <p>If the following parameters are not set in this builder, the following
defaults will be used:
+ *
+ * <ul>
+ * <li>{@code maxBatchSize} will be 10
+ * <li>{@code maxInFlightRequests} will be 50
+ * <li>{@code maxBufferedRequests} will be 5000
+ * <li>{@code maxBatchSizeInBytes} will be 256 KB i.e. {@code 256 * 1000}
+ * <li>{@code maxTimeInBufferMs} will be 5000ms
+ * <li>{@code maxRecordSizeInBytes} will be 256 KB i.e. {@code 256 * 1000}
+ * <li>{@code failOnError} will be false
+ * </ul>
+ *
+ * @param <InputT> type of elements that should be persisted in the destination
+ */
+@PublicEvolving
+public class SqsSinkBuilder<InputT>
+ extends AsyncSinkBaseBuilder<InputT, SendMessageBatchRequestEntry,
SqsSinkBuilder<InputT>> {
+
+ private static final int DEFAULT_MAX_BATCH_SIZE = 10;
+ private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50;
+ private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 5_000;
+ private static final long DEFAULT_MAX_BATCH_SIZE_IN_B = 256 * 1000;
+ private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000;
+ private static final long DEFAULT_MAX_RECORD_SIZE_IN_B = 256 * 1000;
+ private static final boolean DEFAULT_FAIL_ON_ERROR = false;
+ private static final Protocol DEFAULT_HTTP_PROTOCOL = HTTP1_1;
+
+ private Boolean failOnError;
+ private String sqsUrl;
+ private Properties sqsClientProperties;
+
+ private SqsSinkElementConverter<InputT> sqsSinkElementConverter;
+
+ SqsSinkBuilder() {}
+
+ /**
+ * Sets the url of the SQS that the sink will connect to. There is no
default for this
+ * parameter, therefore, this must be provided at sink creation time
otherwise the build will
+ * fail.
+ *
+ * @param sqsUrl the url of the Sqs
+ * @return {@link SqsSinkBuilder} itself
+ */
+ public SqsSinkBuilder<InputT> setSqsUrl(String sqsUrl) {
+ this.sqsUrl = sqsUrl;
+ return this;
+ }
+
+ public SqsSinkBuilder<InputT> setSqsSinkElementConverter(
+ final SqsSinkElementConverter<InputT> sqsSinkElementConverter) {
+ this.sqsSinkElementConverter = sqsSinkElementConverter;
+ return this;
+ }
+
+ /**
+ * If writing to SQS results in a partial or full failure being returned,
the job will fail
+ * immediately with a {@link SqsSinkException} if failOnError is set.
+ *
+ * @param failOnError whether to fail on error
+ * @return {@link SqsSinkBuilder} itself
+ */
+ public SqsSinkBuilder<InputT> setFailOnError(boolean failOnError) {
+ this.failOnError = failOnError;
+ return this;
+ }
+
+ /**
+ * A set of properties used by the sink to create the SQS client. This may
be used to set the
+ * aws region, credentials etc. See the docs for usage and syntax.
+ *
+ * @param sqsClientProps SQS client properties
+ * @return {@link SqsSinkBuilder} itself
+ */
+ public SqsSinkBuilder<InputT> setSqsClientProperties(final Properties
sqsClientProps) {
+ sqsClientProperties = sqsClientProps;
+ return this;
+ }
+
+ @VisibleForTesting
+ Properties getClientPropertiesWithDefaultHttpProtocol() {
+ Properties clientProperties =
+ Optional.ofNullable(sqsClientProperties).orElse(new
Properties());
+ clientProperties.putIfAbsent(HTTP_PROTOCOL_VERSION,
DEFAULT_HTTP_PROTOCOL.toString());
+ return clientProperties;
+ }
+
+ @Override
+ public SqsSink<InputT> build() {
+ return new SqsSink<>(
+ Optional.ofNullable(sqsSinkElementConverter)
+ .orElse(
+ (SqsSinkElementConverter<InputT>)
+
SqsSinkElementConverter.<String>builder()
+ .setSerializationSchema(new
SimpleStringSchema())
+ .build()),
+
Optional.ofNullable(getMaxBatchSize()).orElse(DEFAULT_MAX_BATCH_SIZE),
+ Optional.ofNullable(getMaxInFlightRequests())
+ .orElse(DEFAULT_MAX_IN_FLIGHT_REQUESTS),
+
Optional.ofNullable(getMaxBufferedRequests()).orElse(DEFAULT_MAX_BUFFERED_REQUESTS),
+
Optional.ofNullable(getMaxBatchSizeInBytes()).orElse(DEFAULT_MAX_BATCH_SIZE_IN_B),
+
Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS),
+
Optional.ofNullable(getMaxRecordSizeInBytes()).orElse(DEFAULT_MAX_RECORD_SIZE_IN_B),
+ Optional.ofNullable(failOnError).orElse(DEFAULT_FAIL_ON_ERROR),
+ sqsUrl,
+ getClientPropertiesWithDefaultHttpProtocol());
+ }
+}
diff --git
a/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkElementConverter.java
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkElementConverter.java
new file mode 100644
index 0000000..ba07b46
--- /dev/null
+++
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkElementConverter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+
+/**
+ * An implementation of the {@link ElementConverter} that uses the AWS SQS SDK
v2. The user only
+ * needs to provide a {@link SerializationSchema} of the {@code InputT} to
transform it into a
+ * {@link SendMessageBatchRequestEntry} that may be persisted.
+ */
+@Internal
+public class SqsSinkElementConverter<InputT>
+ implements ElementConverter<InputT, SendMessageBatchRequestEntry> {
+
+ /** A serialization schema to specify how the input element should be
serialized. */
+ private final SerializationSchema<InputT> serializationSchema;
+
+ private SqsSinkElementConverter(SerializationSchema<InputT>
serializationSchema) {
+ this.serializationSchema = serializationSchema;
+ }
+
+ @Override
+ public SendMessageBatchRequestEntry apply(InputT element,
SinkWriter.Context context) {
+ final byte[] messageBody = serializationSchema.serialize(element);
+ return SendMessageBatchRequestEntry.builder()
+ .id(UUID.randomUUID().toString())
+ .messageBody(new String(messageBody, StandardCharsets.UTF_8))
+ .build();
+ }
+
+ @Override
+ public void open(Sink.InitContext context) {
+ try {
+
serializationSchema.open(context.asSerializationSchemaInitializationContext());
+ } catch (Exception e) {
+ throw new FlinkRuntimeException("Failed to initialize
serialization schema.", e);
+ }
+ }
+
+ public static <InputT> Builder<InputT> builder() {
+ return new Builder<>();
+ }
+
+ /** A builder for the SqsSinkElementConverter. */
+ public static class Builder<InputT> {
+
+ private SerializationSchema<InputT> serializationSchema;
+
+ public Builder<InputT> setSerializationSchema(
+ SerializationSchema<InputT> serializationSchema) {
+ this.serializationSchema = serializationSchema;
+ return this;
+ }
+
+ public SqsSinkElementConverter<InputT> build() {
+ Preconditions.checkNotNull(
+ serializationSchema,
+ "No SerializationSchema was supplied to the SQS Sink
builder.");
+ return new SqsSinkElementConverter<>(serializationSchema);
+ }
+ }
+}
diff --git
a/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkException.java
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkException.java
new file mode 100644
index 0000000..d93305b
--- /dev/null
+++
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkException.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** A {@link RuntimeException} wrapper indicating the exception was thrown
from the SQS Sink. */
+@PublicEvolving
+class SqsSinkException extends RuntimeException {
+
+ public SqsSinkException(final String message) {
+ super(message);
+ }
+
+ public SqsSinkException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * When the flag {@code failOnError} is set in {@link SqsSinkWriter}, this
exception is raised
+ * as soon as any exception occurs when writing to SQS.
+ */
+ static class SqsFailFastSinkException extends SqsSinkException {
+
+ private static final String ERROR_MESSAGE =
+ "Encountered an exception while persisting records, not
retrying due to {failOnError} being set.";
+
+ public SqsFailFastSinkException() {
+ super(ERROR_MESSAGE);
+ }
+
+ public SqsFailFastSinkException(final String errorMessage) {
+ super(errorMessage);
+ }
+
+ public SqsFailFastSinkException(final String errorMessage, final
Throwable cause) {
+ super(errorMessage, cause);
+ }
+
+ public SqsFailFastSinkException(final Throwable cause) {
+ super(ERROR_MESSAGE, cause);
+ }
+ }
+}
diff --git
a/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkWriter.java
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkWriter.java
new file mode 100644
index 0000000..459e068
--- /dev/null
+++
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkWriter.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import
org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
+import org.apache.flink.connector.sqs.sink.client.SdkClientProvider;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import static
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier;
+import static
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier;
+import static
org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier;
+
+/**
+ * Sink writer created by {@link SqsSink} to write to SQS. More details on the
operation of this
+ * sink writer may be found in the doc for {@link SqsSink}. More details on
the internals of this
+ * sink writer may be found in {@link AsyncSinkWriter}.
+ *
+ * <p>The {@link SqsAsyncClient} used here may be configured in the standard
way for the AWS SDK
+ * 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID}
and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+@Internal
+class SqsSinkWriter<InputT> extends AsyncSinkWriter<InputT,
SendMessageBatchRequestEntry> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SqsSinkWriter.class);
+
+ private final SdkClientProvider<SqsAsyncClient> clientProvider;
+
+ private static final AWSExceptionHandler SQS_EXCEPTION_HANDLER =
+ AWSExceptionHandler.withClassifier(
+ FatalExceptionClassifier.createChain(
+ getInterruptedExceptionClassifier(),
+ getInvalidCredentialsExceptionClassifier(),
+
SqsExceptionClassifiers.getResourceNotFoundExceptionClassifier(),
+
SqsExceptionClassifiers.getAccessDeniedExceptionClassifier(),
+
SqsExceptionClassifiers.getNotAuthorizedExceptionClassifier(),
+ getSdkClientMisconfiguredExceptionClassifier()));
+
+ private final Counter numRecordsOutErrorsCounter;
+
+ /* Url of SQS */
+ private final String sqsUrl;
+
+ /* The sink writer metric group */
+ private final SinkWriterMetricGroup metrics;
+
+ /* Flag to whether fatally fail any time we encounter an exception when
persisting records */
+ private final boolean failOnError;
+
+ SqsSinkWriter(
+ ElementConverter<InputT, SendMessageBatchRequestEntry>
elementConverter,
+ Sink.InitContext context,
+ int maxBatchSize,
+ int maxInFlightRequests,
+ int maxBufferedRequests,
+ long maxBatchSizeInBytes,
+ long maxTimeInBufferMS,
+ long maxRecordSizeInBytes,
+ boolean failOnError,
+ String sqsUrl,
+ SdkClientProvider<SqsAsyncClient> clientProvider,
+ Collection<BufferedRequestState<SendMessageBatchRequestEntry>>
initialStates) {
+ super(
+ elementConverter,
+ context,
+ AsyncSinkWriterConfiguration.builder()
+ .setMaxBatchSize(maxBatchSize)
+ .setMaxBatchSizeInBytes(maxBatchSizeInBytes)
+ .setMaxInFlightRequests(maxInFlightRequests)
+ .setMaxBufferedRequests(maxBufferedRequests)
+ .setMaxTimeInBufferMS(maxTimeInBufferMS)
+ .setMaxRecordSizeInBytes(maxRecordSizeInBytes)
+ .build(),
+ initialStates);
+ this.failOnError = failOnError;
+ this.sqsUrl = sqsUrl;
+ this.metrics = context.metricGroup();
+ this.numRecordsOutErrorsCounter =
metrics.getNumRecordsOutErrorsCounter();
+ this.clientProvider = clientProvider;
+ }
+
+ @Override
+ protected void submitRequestEntries(
+ List<SendMessageBatchRequestEntry> requestEntries,
+ Consumer<List<SendMessageBatchRequestEntry>> requestResult) {
+
+ final SendMessageBatchRequest batchRequest =
+
SendMessageBatchRequest.builder().entries(requestEntries).queueUrl(sqsUrl).build();
+
+ CompletableFuture<SendMessageBatchResponse> future =
+ clientProvider.getClient().sendMessageBatch(batchRequest);
+
+ future.whenComplete(
+ (response, err) -> {
+ if (err != null) {
+ handleFullyFailedRequest(err, requestEntries,
requestResult);
+ } else if (response.failed() != null &&
response.failed().size() > 0) {
+ handlePartiallyFailedRequest(
+ response, requestEntries,
requestResult);
+ } else {
+ requestResult.accept(Collections.emptyList());
+ }
+ })
+ .exceptionally(
+ ex -> {
+ getFatalExceptionCons()
+ .accept(
+ new
SqsSinkException.SqsFailFastSinkException(
+ ex.getMessage(), ex));
+ return null;
+ });
+ }
+
+ @Override
+ protected long getSizeInBytes(SendMessageBatchRequestEntry requestEntry) {
+ return
requestEntry.messageBody().getBytes(StandardCharsets.UTF_8).length;
+ }
+
+ @Override
+ public void close() {
+ AWSGeneralUtil.closeResources(clientProvider);
+ }
+
+ private void handleFullyFailedRequest(
+ Throwable err,
+ List<SendMessageBatchRequestEntry> requestEntries,
+ Consumer<List<SendMessageBatchRequestEntry>> requestResult) {
+
+ numRecordsOutErrorsCounter.inc(requestEntries.size());
+ boolean isFatal = SQS_EXCEPTION_HANDLER.consumeIfFatal(err,
getFatalExceptionCons());
+ if (isFatal) {
+ return;
+ }
+
+ if (failOnError) {
+ getFatalExceptionCons().accept(new
SqsSinkException.SqsFailFastSinkException(err));
+ return;
+ }
+
+ LOG.warn(
+ "SQS Sink failed to write and will retry {} entries to SQS,
First request was {}",
+ requestEntries.size(),
+ requestEntries.get(0).toString(),
+ err);
+ requestResult.accept(requestEntries);
+ }
+
+ private void handlePartiallyFailedRequest(
+ SendMessageBatchResponse response,
+ List<SendMessageBatchRequestEntry> requestEntries,
+ Consumer<List<SendMessageBatchRequestEntry>> requestResult) {
+
+ LOG.warn(
+ "handlePartiallyFailedRequest: SQS Sink failed to write and
will retry {} entries to SQS",
+ response.failed().size());
+ numRecordsOutErrorsCounter.inc(response.failed().size());
+
+ if (failOnError) {
+ getFatalExceptionCons().accept(new
SqsSinkException.SqsFailFastSinkException());
+ return;
+ }
+
+ final List<SendMessageBatchRequestEntry> failedRequestEntries =
+ new ArrayList<>(response.failed().size());
+
+ for (final BatchResultErrorEntry failedEntry : response.failed()) {
+ final Optional<SendMessageBatchRequestEntry> retryEntry =
+ getFailedRecord(requestEntries, failedEntry.id());
+ if (retryEntry.isPresent()) {
+ failedRequestEntries.add(retryEntry.get());
+ } else {
+ LOG.error(
+ "handlePartiallyFailedRequest: SQS Sink failed to
retry unsuccessful SQS publish request due to invalid failed requestId");
+ getFatalExceptionCons()
+ .accept(
+ new SqsSinkException.SqsFailFastSinkException(
+ "SQS Sink failed to retry unsuccessful
SQS publish request due to invalid failed requestId"));
+ return;
+ }
+ }
+
+ requestResult.accept(failedRequestEntries);
+ }
+
+ private Optional<SendMessageBatchRequestEntry> getFailedRecord(
+ List<SendMessageBatchRequestEntry> requestEntries, String
selectedId) {
+ for (SendMessageBatchRequestEntry entry : requestEntries) {
+ if (entry.id().equals(selectedId)) {
+ return Optional.of(entry);
+ }
+ }
+ return Optional.empty();
+ }
+}
diff --git
a/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsStateSerializer.java
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsStateSerializer.java
new file mode 100644
index 0000000..426409a
--- /dev/null
+++
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsStateSerializer.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer;
+
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+/** SQS implementation {@link AsyncSinkWriterStateSerializer}. */
+@Internal
+public class SqsStateSerializer
+ extends AsyncSinkWriterStateSerializer<SendMessageBatchRequestEntry> {
+ @Override
+ protected void serializeRequestToStream(
+ final SendMessageBatchRequestEntry request, final DataOutputStream
out)
+ throws IOException {
+ out.write(request.messageBody().getBytes(StandardCharsets.UTF_8));
+ serializeRequestId(request.id(), out);
+ }
+
+ protected void serializeRequestId(String requestId, DataOutputStream out)
throws IOException {
+ out.writeInt(requestId.length());
+ out.write(requestId.getBytes(StandardCharsets.UTF_8));
+ }
+
+ @Override
+ protected SendMessageBatchRequestEntry deserializeRequestFromStream(
+ final long requestSize, final DataInputStream in) throws
IOException {
+ final byte[] requestData = new byte[(int) requestSize];
+ in.read(requestData);
+ return SendMessageBatchRequestEntry.builder()
+ .id(deserializeRequestId(in))
+ .messageBody(new String(requestData, StandardCharsets.UTF_8))
+ .build();
+ }
+
+ protected String deserializeRequestId(DataInputStream in) throws
IOException {
+ int requestIdLength = in.readInt();
+ byte[] requestIdData = new byte[(int) requestIdLength];
+ in.read(requestIdData);
+ return new String(requestIdData, StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public int getVersion() {
+ return 1;
+ }
+}
diff --git
a/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/client/SdkClientProvider.java
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/client/SdkClientProvider.java
new file mode 100644
index 0000000..8c3b1ae
--- /dev/null
+++
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/client/SdkClientProvider.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink.client;
+
+import org.apache.flink.annotation.Internal;
+
+import software.amazon.awssdk.core.SdkClient;
+import software.amazon.awssdk.utils.SdkAutoCloseable;
+
+/** Provides a {@link SdkClient}. */
+@Internal
+public interface SdkClientProvider<T extends SdkClient> extends
SdkAutoCloseable {
+
+ /**
+ * Returns {@link T}.
+ *
+ * @return the AWS SDK client
+ */
+ T getClient();
+}
diff --git
a/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/client/SqsAsyncClientProvider.java
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/client/SqsAsyncClientProvider.java
new file mode 100644
index 0000000..dfd6c80
--- /dev/null
+++
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/client/SqsAsyncClientProvider.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink.client;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.aws.util.AWSClientUtil;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.sqs.sink.SqsConfigConstants;
+
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+
+import java.util.Properties;
+
+/** Provides a {@link SqsAsyncClient}. */
+@Internal
+public class SqsAsyncClientProvider implements
SdkClientProvider<SqsAsyncClient> {
+
+ private final SdkAsyncHttpClient httpClient;
+ private final SqsAsyncClient sqsAsyncClient;
+
+ public SqsAsyncClientProvider(Properties clientProperties) {
+ this.httpClient =
AWSGeneralUtil.createAsyncHttpClient(clientProperties);
+ this.sqsAsyncClient = buildClient(clientProperties, httpClient);
+ }
+
+ @Override
+ public SqsAsyncClient getClient() {
+ return sqsAsyncClient;
+ }
+
+ @Override
+ public void close() {
+ AWSGeneralUtil.closeResources(httpClient, sqsAsyncClient);
+ }
+
+ private SqsAsyncClient buildClient(
+ Properties sqsClientProperties, SdkAsyncHttpClient httpClient) {
+ AWSGeneralUtil.validateAwsCredentials(sqsClientProperties);
+
+ return AWSClientUtil.createAwsAsyncClient(
+ sqsClientProperties,
+ httpClient,
+ SqsAsyncClient.builder(),
+ SqsConfigConstants.BASE_SQS_USER_AGENT_PREFIX_FORMAT.key(),
+ SqsConfigConstants.SQS_CLIENT_USER_AGENT_PREFIX.key());
+ }
+}
diff --git
a/flink-connector-aws/flink-connector-sqs/src/main/resources/log4j2.properties
b/flink-connector-aws/flink-connector-sqs/src/main/resources/log4j2.properties
new file mode 100644
index 0000000..c64a340
--- /dev/null
+++
b/flink-connector-aws/flink-connector-sqs/src/main/resources/log4j2.properties
@@ -0,0 +1,25 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger.level = OFF
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git
a/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
new file mode 100644
index 0000000..ff6d2f3
--- /dev/null
+++
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.architecture;
+
+import org.apache.flink.architecture.common.ImportOptions;
+
+import com.tngtech.archunit.core.importer.ImportOption;
+import com.tngtech.archunit.junit.AnalyzeClasses;
+import com.tngtech.archunit.junit.ArchTest;
+import com.tngtech.archunit.junit.ArchTests;
+
+/** Architecture tests for test code. */
+@AnalyzeClasses(
+ packages = "org.apache.flink.connector.sqs",
+ importOptions = {
+ ImportOption.OnlyIncludeTests.class,
+ ImportOptions.ExcludeScalaImportOption.class,
+ ImportOptions.ExcludeShadedImportOption.class
+ })
+public class TestCodeArchitectureTest {
+
+ @ArchTest
+ public static final ArchTests COMMON_TESTS =
ArchTests.in(TestCodeArchitectureTestBase.class);
+}
diff --git
a/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsExceptionClassifiersTest.java
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsExceptionClassifiersTest.java
new file mode 100644
index 0000000..4ebb4e2
--- /dev/null
+++
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsExceptionClassifiersTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.services.sqs.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.sqs.model.SqsException;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+/** Unit tests for {@link SqsExceptionClassifiers}. */
+public class SqsExceptionClassifiersTest {
+
+ private final FatalExceptionClassifier classifier =
+ FatalExceptionClassifier.createChain(
+
SqsExceptionClassifiers.getAccessDeniedExceptionClassifier(),
+
SqsExceptionClassifiers.getNotAuthorizedExceptionClassifier(),
+
SqsExceptionClassifiers.getResourceNotFoundExceptionClassifier());
+
+ @Test
+ public void shouldClassifyNotAuthorizedAsFatal() {
+ AwsServiceException sqsException =
+ SqsException.builder()
+ .awsErrorDetails(
+
AwsErrorDetails.builder().errorCode("NotAuthorized").build())
+ .build();
+
+ // isFatal returns `true` if an exception is non-fatal
+ assertFalse(classifier.isFatal(sqsException, ex -> {}));
+ }
+
+ @Test
+ public void shouldClassifyAccessDeniedExceptionAsFatal() {
+ AwsServiceException sqsException =
+ SqsException.builder()
+ .awsErrorDetails(
+ AwsErrorDetails.builder()
+ .errorCode("AccessDeniedException")
+ .build())
+ .build();
+
+ // isFatal returns `true` if an exception is non-fatal
+ assertFalse(classifier.isFatal(sqsException, ex -> {}));
+ }
+
+ @Test
+ public void shouldClassifySocketTimeoutExceptionAsNonFatal() {
+ AwsServiceException sqsException =
+ SqsException.builder()
+ .awsErrorDetails(
+ AwsErrorDetails.builder()
+ .errorCode("SocketTimeoutException")
+ .build())
+ .build();
+
+ // isFatal returns `true` if an exception is non-fatal
+ assertTrue(classifier.isFatal(sqsException, ex -> {}));
+ }
+
+ @Test
+ public void shouldClassifyResourceNotFoundAsFatal() {
+ AwsServiceException sqsException =
ResourceNotFoundException.builder().build();
+
+ // isFatal returns `true` if an exception is non-fatal
+ assertFalse(classifier.isFatal(sqsException, ex -> {}));
+ }
+}
diff --git
a/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkBuilderTest.java
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkBuilderTest.java
new file mode 100644
index 0000000..bc6436a
--- /dev/null
+++
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkBuilderTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Properties;
+
+/** Covers construction, defaults and sanity checking of {@link
SqsSinkBuilder}. */
+class SqsSinkBuilderTest {
+
+ @Test
+ void sqsUrlOfSinkMustBeSetWhenBuilt() {
+ Assertions.assertThatExceptionOfType(NullPointerException.class)
+ .isThrownBy(() -> SqsSink.<String>builder().build())
+ .withMessageContaining(
+ "The sqs url must not be null when initializing the
SQS Sink.");
+ }
+
+ @Test
+ void sqsUrlOfSinkMustBeSetToNonEmptyWhenBuilt() {
+ Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(
+ () ->
+ SqsSink.<String>builder()
+ .setSqsUrl("")
+ .setSqsClientProperties(new
Properties())
+ .setFailOnError(true)
+ .build())
+ .withMessageContaining("The sqs url must be set when
initializing the SQS Sink.");
+ }
+}
diff --git
a/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkElementConverterTest.java
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkElementConverterTest.java
new file mode 100644
index 0000000..93d0414
--- /dev/null
+++
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkElementConverterTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+
+import java.nio.charset.StandardCharsets;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Covers construction and sanity checking of {@link
SqsSinkElementConverter}. */
+class SqsSinkElementConverterTest {
+
+ @Test
+ void
elementConverterWillComplainASerializationSchemaIsNotSetIfBuildIsCalledWithoutIt()
{
+ Assertions.assertThatExceptionOfType(NullPointerException.class)
+ .isThrownBy(() ->
SqsSinkElementConverter.<String>builder().build())
+ .withMessageContaining(
+ "No SerializationSchema was supplied to the SQS Sink
builder.");
+ }
+
+ @Test
+ void elementConverterUsesProvidedSchemaToSerializeRecord() {
+ ElementConverter<String, SendMessageBatchRequestEntry>
elementConverter =
+ SqsSinkElementConverter.<String>builder()
+ .setSerializationSchema(new OpenCheckingStringSchema())
+ .build();
+
+ String testString = "{many hands make light work;";
+
+ SendMessageBatchRequestEntry serializedRecord =
elementConverter.apply(testString, null);
+ byte[] serializedString = (new
OpenCheckingStringSchema()).serialize(testString);
+ assertThat(serializedRecord.messageBody())
+ .isEqualTo(new String(serializedString,
StandardCharsets.UTF_8));
+ }
+
+ private static class OpenCheckingStringSchema extends SimpleStringSchema {
+
+ private boolean isOpen = false;
+
+ @Override
+ public void open(SerializationSchema.InitializationContext context)
throws Exception {
+ super.open(context);
+ isOpen = true;
+ }
+
+ public Boolean isOpen() {
+ return isOpen;
+ }
+ }
+}
diff --git
a/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkTest.java
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkTest.java
new file mode 100644
index 0000000..b0a361c
--- /dev/null
+++
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+
+import java.util.Properties;
+
+import static
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
+import static
org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES;
+import static
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig;
+
+/** Covers construction, defaults and sanity checking of {@link SqsSink}. */
+class SqsSinkTest {
+
+ private static final ElementConverter<String,
SendMessageBatchRequestEntry> elementConverter =
+ SqsSinkElementConverter.<String>builder()
+ .setSerializationSchema(new SimpleStringSchema())
+ .build();
+
+ @Test
+ void sqsUrlMustNotBeNull() {
+ Assertions.assertThatExceptionOfType(NullPointerException.class)
+ .isThrownBy(
+ () ->
+ new SqsSink<>(
+ elementConverter,
+ 10,
+ 50,
+ 5000,
+ 256000L,
+ 5000L,
+ 256 * 1000L,
+ false,
+ null,
+ new Properties()))
+ .withMessageContaining(
+ "The sqs url must not be null when initializing the
SQS Sink.");
+ }
+
+ @Test
+ void sqsUrlMustNotBeEmpty() {
+ Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(
+ () ->
+ new SqsSink<>(
+ elementConverter,
+ 500,
+ 16,
+ 10000,
+ 4 * 1024 * 1024L,
+ 5000L,
+ 1000 * 1024L,
+ false,
+ "",
+ new Properties()))
+ .withMessageContaining("The sqs url must be set when
initializing the SQS Sink.");
+ }
+
+ @Test
+ void sqsMaxBatchSizeMustNotBeGreaterThan10() {
+ Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(
+ () ->
+ new SqsSink<>(
+ elementConverter,
+ 100,
+ 50,
+ 5000,
+ 256000L,
+ 5000L,
+ 256 * 1000L,
+ false,
+ "testSqlUrl",
+ new Properties()))
+ .withMessageContaining("The sqs MaxBatchSize must not be
greater than 10.");
+ }
+
+ @Test
+ void sqsSinkFailsWhenAccessKeyIdIsNotProvided() {
+ Properties properties = createConfig("https://non-exisitent-location");
+ properties.setProperty(
+ AWS_CREDENTIALS_PROVIDER,
AWSConfigConstants.CredentialProvider.BASIC.toString());
+
properties.remove(AWSConfigConstants.accessKeyId(AWS_CREDENTIALS_PROVIDER));
+
sqsSinkFailsWithAppropriateMessageWhenInitialConditionsAreMisconfigured(
+ properties, "Please set values for AWS Access Key ID");
+ }
+
+ @Test
+ void sqsSinkFailsWhenRegionIsNotProvided() {
+ Properties properties = createConfig("https://non-exisitent-location");
+ properties.remove(AWS_REGION);
+
sqsSinkFailsWithAppropriateMessageWhenInitialConditionsAreMisconfigured(
+ properties, "region must not be null.");
+ }
+
+ @Test
+ void sqsSinkFailsWhenUnableToConnectToRemoteService() {
+ Properties properties = createConfig("https://non-exisitent-location");
+ properties.remove(TRUST_ALL_CERTIFICATES);
+
sqsSinkFailsWithAppropriateMessageWhenInitialConditionsAreMisconfigured(
+ properties,
+ "Received an UnknownHostException when attempting to interact
with a service.");
+ }
+
+ private void
sqsSinkFailsWithAppropriateMessageWhenInitialConditionsAreMisconfigured(
+ Properties properties, String errorMessage) {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ SqsSink<String> sqsSink =
+ SqsSink.<String>builder()
+ .setSqsSinkElementConverter(
+ SqsSinkElementConverter.<String>builder()
+ .setSerializationSchema(new
SimpleStringSchema())
+ .build())
+ .setSqsUrl("sqs-url")
+ .setMaxBatchSize(5)
+ .setSqsClientProperties(properties)
+ .build();
+
+ SqsTestUtils.getSampleDataGenerator(env, 10).sinkTo(sqsSink);
+
+ Assertions.assertThatExceptionOfType(JobExecutionException.class)
+ .isThrownBy(() -> env.execute("Integration Test"))
+ .havingCause()
+ .havingCause()
+ .withMessageContaining(errorMessage);
+ }
+}
diff --git
a/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkWriterTest.java
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkWriterTest.java
new file mode 100644
index 0000000..38c25f9
--- /dev/null
+++
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkWriterTest.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
+import org.apache.flink.connector.sqs.sink.client.SdkClientProvider;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
+import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.BatchRequestTooLongException;
+import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
+import software.amazon.awssdk.services.sqs.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchResultEntry;
+import software.amazon.awssdk.services.sqs.model.SqsException;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.function.Consumer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+
+/** Covers construction, defaults and sanity checking of {@link
SqsSinkWriter}. */
+public class SqsSinkWriterTest {
+
+ private SqsSinkWriter<String> sinkWriter;
+
+ private static final ElementConverter<String, SendMessageBatchRequestEntry>
+ ELEMENT_CONVERTER_PLACEHOLDER =
+ SqsSinkElementConverter.<String>builder()
+ .setSerializationSchema(new SimpleStringSchema())
+ .build();
+
+ @Test
+ public void testNonRetryableExceptionWhenFailOnErrorFalseWillNotRetry()
throws IOException {
+ Optional<Exception> exceptionToThrow =
getGenericNonRetryableException();
+ ThrowingSqsAsyncClient sqsAsyncClient = new
ThrowingSqsAsyncClient(exceptionToThrow);
+ TestSqsAsyncClientProvider testSqsAsyncClientProvider =
+ new TestSqsAsyncClientProvider(sqsAsyncClient);
+ sinkWriter = getSqsSinkWriter(false, testSqsAsyncClientProvider);
+ CompletableFuture<List<SendMessageBatchRequestEntry>> failedRequests =
+ new CompletableFuture<>();
+ Consumer<List<SendMessageBatchRequestEntry>> failedRequestConsumer =
+ failedRequests::complete;
+ sinkWriter.submitRequestEntries(getDefaultInputRequests(),
failedRequestConsumer);
+ assertThat(failedRequests).isNotCompleted();
+ }
+
+ @Test
+ public void testRetryableExceptionWhenFailOnErrorTrueWillNotRetry() throws
IOException {
+ Optional<Exception> exceptionToThrow = getGenericRetryableException();
+ ThrowingSqsAsyncClient sqsAsyncClient = new
ThrowingSqsAsyncClient(exceptionToThrow);
+ TestSqsAsyncClientProvider testSqsAsyncClientProvider =
+ new TestSqsAsyncClientProvider(sqsAsyncClient);
+ sinkWriter = getSqsSinkWriter(true, testSqsAsyncClientProvider);
+
+ CompletableFuture<List<SendMessageBatchRequestEntry>> failedRequests =
+ new CompletableFuture<>();
+ Consumer<List<SendMessageBatchRequestEntry>> failedRequestConsumer =
+ failedRequests::complete;
+ sinkWriter.submitRequestEntries(getDefaultInputRequests(),
failedRequestConsumer);
+ assertThat(failedRequests).isNotCompleted();
+ }
+
+ @Test
+ public void testRetryableExceptionWhenFailOnErrorFalseWillRetry() throws
IOException {
+ Optional<Exception> exceptionToThrow = getGenericRetryableException();
+ ThrowingSqsAsyncClient sqsAsyncClient = new
ThrowingSqsAsyncClient(exceptionToThrow);
+ TestSqsAsyncClientProvider testSqsAsyncClientProvider =
+ new TestSqsAsyncClientProvider(sqsAsyncClient);
+ sinkWriter = getSqsSinkWriter(false, testSqsAsyncClientProvider);
+
+ CompletableFuture<List<SendMessageBatchRequestEntry>> failedRequests =
+ new CompletableFuture<>();
+ Consumer<List<SendMessageBatchRequestEntry>> failedRequestConsumer =
+ failedRequests::complete;
+ sinkWriter.submitRequestEntries(getDefaultInputRequests(),
failedRequestConsumer);
+ assertThat(failedRequests).isCompleted();
+ }
+
+ @Test
+ public void testSubmitRequestEntriesWithNoException() throws IOException {
+ TrackingSqsAsyncClient sqsAsyncClient = new TrackingSqsAsyncClient();
+ TestSqsAsyncClientProvider testSqsAsyncClientProvider =
+ new TestSqsAsyncClientProvider(sqsAsyncClient);
+ sinkWriter = getSqsSinkWriter(false, testSqsAsyncClientProvider);
+
+ CompletableFuture<List<SendMessageBatchRequestEntry>> requests = new
CompletableFuture<>();
+ Consumer<List<SendMessageBatchRequestEntry>> requestResult =
requests::complete;
+
+ sinkWriter.submitRequestEntries(getDefaultInputRequests(),
requestResult);
+
+ List<SendMessageBatchRequestEntry> result = requests.join();
+ Assertions.assertTrue(result.isEmpty());
+ }
+
+ @Test
+ public void
testSubmitRequestEntriesWithPartialSuccessWithFailOnErrorFalseWillRetry()
+ throws IOException {
+ FailedSqsAsyncClient sqsAsyncClient = new
FailedSqsAsyncClient("testId2");
+ TestSqsAsyncClientProvider testSqsAsyncClientProvider =
+ new TestSqsAsyncClientProvider(sqsAsyncClient);
+ sinkWriter = getSqsSinkWriter(false, testSqsAsyncClientProvider);
+
+ CompletableFuture<List<SendMessageBatchRequestEntry>> requests = new
CompletableFuture<>();
+ Consumer<List<SendMessageBatchRequestEntry>> requestResult =
requests::complete;
+
+ sinkWriter.submitRequestEntries(getDefaultInputRequests(),
requestResult);
+
+ List<SendMessageBatchRequestEntry> result = requests.join();
+ Assertions.assertEquals(1, result.size());
+ }
+
+ @Test
+ public void
+
testSubmitRequestEntriesWithPartialSuccessWithFailOnErrorFalseAndFailedIdDoesNotMatchWillNotRetry()
+ throws IOException {
+ FailedSqsAsyncClient sqsAsyncClient = new
FailedSqsAsyncClient("invalidFailedRequestId");
+ TestSqsAsyncClientProvider testSqsAsyncClientProvider =
+ new TestSqsAsyncClientProvider(sqsAsyncClient);
+ sinkWriter = getSqsSinkWriter(false, testSqsAsyncClientProvider);
+
+ CompletableFuture<List<SendMessageBatchRequestEntry>> requests = new
CompletableFuture<>();
+ Consumer<List<SendMessageBatchRequestEntry>> requestResult =
requests::complete;
+
+ sinkWriter.submitRequestEntries(getDefaultInputRequests(),
requestResult);
+ assertThat(requests).isNotCompleted();
+ }
+
+ @Test
+ public void
testSubmitRequestEntriesWithPartialSuccessWithFailOnErrorTrueWillNotRetry()
+ throws IOException {
+
+ FailedSqsAsyncClient sqsAsyncClient = new
FailedSqsAsyncClient("testId2");
+ TestSqsAsyncClientProvider testSqsAsyncClientProvider =
+ new TestSqsAsyncClientProvider(sqsAsyncClient);
+ sinkWriter = getSqsSinkWriter(true, testSqsAsyncClientProvider);
+
+ CompletableFuture<List<SendMessageBatchRequestEntry>> requests = new
CompletableFuture<>();
+ Consumer<List<SendMessageBatchRequestEntry>> requestResult =
requests::complete;
+
+ sinkWriter.submitRequestEntries(getDefaultInputRequests(),
requestResult);
+ assertThat(requests).isNotCompleted();
+ }
+
+ @Test
+ public void testClientClosesWhenWriterIsClosed() throws IOException {
+ TrackingSqsAsyncClient sqsAsyncClient = new TrackingSqsAsyncClient();
+ TestSqsAsyncClientProvider testSqsAsyncClientProvider =
+ new TestSqsAsyncClientProvider(sqsAsyncClient);
+ sinkWriter = getSqsSinkWriter(false, testSqsAsyncClientProvider);
+ sinkWriter.close();
+ assertThat(testSqsAsyncClientProvider.getCloseCount()).isEqualTo(1);
+ }
+
+ @Test
+ void testGetSizeInBytesReturnsSizeOfBlob() throws IOException {
+ TrackingSqsAsyncClient sqsAsyncClient = new TrackingSqsAsyncClient();
+ TestSqsAsyncClientProvider testSqsAsyncClientProvider =
+ new TestSqsAsyncClientProvider(sqsAsyncClient);
+ sinkWriter = getSqsSinkWriter(false, testSqsAsyncClientProvider);
+
+ String testString = "{many hands make light work;";
+ SendMessageBatchRequestEntry record =
+
SendMessageBatchRequestEntry.builder().messageBody(testString).build();
+ assertThat(sinkWriter.getSizeInBytes(record))
+ .isEqualTo(testString.getBytes(StandardCharsets.UTF_8).length);
+ }
+
+ @Test
+ void getNumRecordsOutErrorsCounterRecordsCorrectNumberOfFailures()
+ throws IOException, InterruptedException {
+ TestSinkInitContext ctx = new TestSinkInitContext();
+ SqsSink<String> sqsSink =
+ new SqsSink<>(
+ ELEMENT_CONVERTER_PLACEHOLDER,
+ 10,
+ 16,
+ 10000,
+ 4 * 1024 * 1024L,
+ 5000L,
+ 1000 * 1024L,
+ true,
+ "test-stream",
+
AWSServicesTestUtils.createConfig("https://localhost"));
+ SinkWriter<String> writer = sqsSink.createWriter(ctx);
+
+ for (int i = 0; i < 12; i++) {
+ writer.write("data_bytes", null);
+ }
+ assertThatExceptionOfType(CompletionException.class)
+ .isThrownBy(() -> writer.flush(true))
+ .withCauseInstanceOf(SdkClientException.class)
+ .withMessageContaining(
+ "Unable to execute HTTP request: Connection refused:
localhost/127.0.0.1:443");
+
assertThat(ctx.metricGroup().getNumRecordsOutErrorsCounter().getCount()).isEqualTo(10);
+
assertThat(ctx.metricGroup().getNumRecordsSendErrorsCounter().getCount()).isEqualTo(10);
+ }
+
+ private SqsSinkWriter<String> getSqsSinkWriter(
+ final boolean failOnError, TestSqsAsyncClientProvider
testSqsAsyncClientProvider)
+ throws IOException {
+ TestSinkInitContext sinkInitContext = new TestSinkInitContext();
+ Properties sinkProperties =
AWSServicesTestUtils.createConfig("https://fake_aws_endpoint");
+ SqsSink<String> sink =
+ new SqsSink<>(
+ ELEMENT_CONVERTER_PLACEHOLDER,
+ 10,
+ 16,
+ 10000,
+ 4 * 1024 * 1024L,
+ 5000L,
+ 1000 * 1024L,
+ failOnError,
+
"https://sqs.us-east-2.amazonaws.com/618277569814/fake-sqs",
+ sinkProperties);
+ sink.setSqsAsyncClientProvider(testSqsAsyncClientProvider);
+ SqsSinkWriter sqsSinkWriter = (SqsSinkWriter<String>)
sink.createWriter(sinkInitContext);
+ return sqsSinkWriter;
+ }
+
+ private List<SendMessageBatchRequestEntry> getDefaultInputRequests() {
+ return Arrays.asList(
+ sinkSendMessageBatchRequestEntry("test1", "testId1"),
+ sinkSendMessageBatchRequestEntry("test2", "testId2"));
+ }
+
+ private SendMessageBatchRequestEntry sinkSendMessageBatchRequestEntry(
+ final String messageBody, final String id) {
+ return
SendMessageBatchRequestEntry.builder().id(id).messageBody(messageBody).build();
+ }
+
+ private Optional<Exception> getGenericRetryableException() {
+ return Optional.of(
+ BatchRequestTooLongException.builder()
+ .awsErrorDetails(
+ AwsErrorDetails.builder()
+
.errorCode("SomeErrorCodeThatIsNotUsed")
+ .build())
+ .build());
+ }
+
+ private Optional<Exception> getGenericNonRetryableException() {
+ return Optional.of(
+ ResourceNotFoundException.builder()
+ .awsErrorDetails(
+ AwsErrorDetails.builder()
+
.errorCode("SomeErrorCodeThatIsNotUsed")
+ .build())
+ .build());
+ }
+
+ private static class ThrowingSqsAsyncClient<T extends Throwable>
implements SqsAsyncClient {
+
+ private final Optional<T> errorToReturn;
+
+ private ThrowingSqsAsyncClient(Optional<T> errorToReturn) {
+ this.errorToReturn = errorToReturn;
+ }
+
+ @Override
+ public String serviceName() {
+ return "SQS";
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public CompletableFuture<SendMessageBatchResponse> sendMessageBatch(
+ SendMessageBatchRequest sendMessageBatchRequest) {
+ CompletableFuture<SendMessageBatchResponse> future = new
CompletableFuture<>();
+ future.completeExceptionally(
+
SqsException.builder().cause((errorToReturn.get())).build());
+ return future;
+ }
+ }
+
+ private static class TrackingSqsAsyncClient implements SqsAsyncClient {
+
+ @Override
+ public String serviceName() {
+ return "SQS";
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public CompletableFuture<SendMessageBatchResponse> sendMessageBatch(
+ SendMessageBatchRequest sendMessageBatchRequest) {
+ SendMessageBatchResultEntry sendMessageBatchResultEntry =
+ SendMessageBatchResultEntry.builder().build();
+ List<SendMessageBatchResultEntry> sendMessageBatchResultEntryList
= new ArrayList<>();
+ sendMessageBatchResultEntryList.add(sendMessageBatchResultEntry);
+ return CompletableFuture.completedFuture(
+ SendMessageBatchResponse.builder()
+ .failed(Collections.emptyList())
+ .successful(sendMessageBatchResultEntryList)
+ .build());
+ }
+ }
+
+ private static class FailedSqsAsyncClient implements SqsAsyncClient {
+
+ private final String failedRequestId;
+
+ private FailedSqsAsyncClient(String failedRequestId) {
+ this.failedRequestId = failedRequestId;
+ }
+
+ @Override
+ public String serviceName() {
+ return "SQS";
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public CompletableFuture<SendMessageBatchResponse> sendMessageBatch(
+ SendMessageBatchRequest sendMessageBatchRequest) {
+ BatchResultErrorEntry batchResultErrorEntry =
+
BatchResultErrorEntry.builder().id(failedRequestId).build();
+ List<BatchResultErrorEntry> batchResultErrorEntryList = new
ArrayList<>();
+ batchResultErrorEntryList.add(batchResultErrorEntry);
+
+ SendMessageBatchResultEntry sendMessageBatchResultEntry =
+
SendMessageBatchResultEntry.builder().id("testId1").build();
+ List<SendMessageBatchResultEntry> sendMessageBatchResultEntryList
= new ArrayList<>();
+ sendMessageBatchResultEntryList.add(sendMessageBatchResultEntry);
+ return CompletableFuture.completedFuture(
+ SendMessageBatchResponse.builder()
+ .failed(batchResultErrorEntryList)
+ .successful(sendMessageBatchResultEntryList)
+ .build());
+ }
+ }
+
+ private static class TestSqsAsyncClientProvider implements
SdkClientProvider<SqsAsyncClient> {
+
+ private final SqsAsyncClient sqsAsyncClient;
+ private int closeCount = 0;
+
+ private TestSqsAsyncClientProvider(SqsAsyncClient sqsAsyncClient) {
+ this.sqsAsyncClient = sqsAsyncClient;
+ }
+
+ @Override
+ public SqsAsyncClient getClient() {
+ return sqsAsyncClient;
+ }
+
+ @Override
+ public void close() {
+ closeCount++;
+ }
+
+ public int getCloseCount() {
+ return closeCount;
+ }
+ }
+}
diff --git
a/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsStateSerializerTest.java
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsStateSerializerTest.java
new file mode 100644
index 0000000..48b4ff7
--- /dev/null
+++
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsStateSerializerTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.RequestEntryWrapper;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import static
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTestUtils.getTestState;
+
+/** Test class for {@link SqsStateSerializer}. */
+class SqsStateSerializerTest {
+
+ private static final ElementConverter<String,
SendMessageBatchRequestEntry> ELEMENT_CONVERTER =
+ SqsSinkElementConverter.<String>builder()
+ .setSerializationSchema(new SimpleStringSchema())
+ .build();
+
+ @Test
+ void testSerializeAndDeserialize() throws IOException {
+ BufferedRequestState<SendMessageBatchRequestEntry> expectedState =
+ getTestState(ELEMENT_CONVERTER, this::getRequestSize);
+
+ SqsStateSerializer serializer = new SqsStateSerializer();
+
+ BufferedRequestState<SendMessageBatchRequestEntry> actualState =
+ serializer.deserialize(1, serializer.serialize(expectedState));
+
+ assertThatBufferStatesAreEqual(actualState, expectedState);
+ }
+
+ private int getRequestSize(SendMessageBatchRequestEntry requestEntry) {
+ return requestEntry.messageBody().length();
+ }
+
+ private <T extends Serializable> void assertThatBufferStatesAreEqual(
+ BufferedRequestState<SendMessageBatchRequestEntry> actual,
+ BufferedRequestState<SendMessageBatchRequestEntry> expected) {
+
Assertions.assertThat(actual.getStateSize()).isEqualTo(expected.getStateSize());
+ int actualLength = actual.getBufferedRequestEntries().size();
+
Assertions.assertThat(actualLength).isEqualTo(expected.getBufferedRequestEntries().size());
+ List<RequestEntryWrapper<SendMessageBatchRequestEntry>> actualRequests
=
+ actual.getBufferedRequestEntries();
+ List<RequestEntryWrapper<SendMessageBatchRequestEntry>>
expectedRequests =
+ expected.getBufferedRequestEntries();
+
+ for (int i = 0; i < actualLength; ++i) {
+
Assertions.assertThat((actualRequests.get(i)).getRequestEntry().messageBody())
+
.isEqualTo((expectedRequests.get(i)).getRequestEntry().messageBody());
+
Assertions.assertThat((actualRequests.get(i)).getRequestEntry().id())
+
.isEqualTo((expectedRequests.get(i)).getRequestEntry().id());
+
Assertions.assertThat((actualRequests.get(i)).getRequestEntry().id()).isNotNull();
+ }
+ }
+}
diff --git
a/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/testutils/SqsTestUtils.java
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/testutils/SqsTestUtils.java
new file mode 100644
index 0000000..d7e69b1
--- /dev/null
+++
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/testutils/SqsTestUtils.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink.testutils;
+
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
+import software.amazon.awssdk.utils.ImmutableMap;
+
+/**
+ * A set of static methods that can be used to call common AWS services on the
Localstack container.
+ */
+public class SqsTestUtils {
+
+ private static final ObjectMapper MAPPER = createObjectMapper();
+
+ public static SqsClient createSqsClient(String endpoint, SdkHttpClient
httpClient) {
+ return AWSServicesTestUtils.createAwsSyncClient(endpoint, httpClient,
SqsClient.builder());
+ }
+
+ public static DataStream<String> getSampleDataGenerator(
+ StreamExecutionEnvironment env, int endValue) {
+ return env.fromSequence(1, endValue)
+ .map(Object::toString)
+ .returns(String.class)
+ .map(data -> MAPPER.writeValueAsString(ImmutableMap.of("data",
data)));
+ }
+
+ public static void createSqs(String sqsName, SqsClient sqsClient) {
+ CreateQueueRequest createQueueRequest =
+ CreateQueueRequest.builder().queueName(sqsName).build();
+
+ sqsClient.createQueue(createQueueRequest);
+ }
+
+ private static ObjectMapper createObjectMapper() {
+ ObjectMapper objectMapper = new ObjectMapper();
+ return objectMapper;
+ }
+}
diff --git
a/flink-connector-aws/flink-connector-sqs/src/test/resources/archunit.properties
b/flink-connector-aws/flink-connector-sqs/src/test/resources/archunit.properties
new file mode 100644
index 0000000..15be88c
--- /dev/null
+++
b/flink-connector-aws/flink-connector-sqs/src/test/resources/archunit.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# By default we allow removing existing violations, but fail when new
violations are added.
+freeze.store.default.allowStoreUpdate=true
+
+# Enable this if a new (frozen) rule has been added in order to create the
initial store and record the existing violations.
+#freeze.store.default.allowStoreCreation=true
+
+# Enable this to add allow new violations to be recorded.
+# NOTE: Adding new violations should be avoided when possible. If the rule was
correct to flag a new
+# violation, please try to avoid creating the violation. If the
violation was created due to a
+# shortcoming of the rule, file a JIRA issue so the rule can be improved.
+#freeze.refreeze=true
+
+freeze.store.default.path=archunit-violations
diff --git
a/flink-connector-aws/flink-connector-sqs/src/test/resources/log4j2-test.properties
b/flink-connector-aws/flink-connector-sqs/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..c4fa187
--- /dev/null
+++
b/flink-connector-aws/flink-connector-sqs/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-connector-aws/pom.xml b/flink-connector-aws/pom.xml
index 4ac5b80..678a279 100644
--- a/flink-connector-aws/pom.xml
+++ b/flink-connector-aws/pom.xml
@@ -18,8 +18,8 @@ specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
@@ -38,6 +38,7 @@ under the License.
<module>flink-connector-aws-kinesis-firehose</module>
<module>flink-connector-aws-kinesis-streams</module>
<module>flink-connector-kinesis</module>
+ <module>flink-connector-sqs</module>
<module>flink-sql-connector-dynamodb</module>
<module>flink-sql-connector-aws-kinesis-firehose</module>
@@ -45,4 +46,4 @@ under the License.
<module>flink-sql-connector-kinesis</module>
</modules>
-</project>
+</project>
\ No newline at end of file