This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0e908be6e5344f6220a1329580d9e4535cd6ceb6
Author: Zichen Liu <[email protected]>
AuthorDate: Thu Dec 23 11:57:09 2021 +0000

    [FLINK-24228][connectors/firehose] Amazon Kinesis Data Firehose sink based 
on the Async Sink Base implemented:
     - Refactored AWSUnifiedSinksUtil into a class that caters for Kinesis and 
Firehose
     - Extracted commonalities between KDS & KDF sinks into 
flink-connector-aws-base
     - Implemented integration test based on Localstack container
     - Changing host/container ports to be different, changing HTTP1.1 to being 
the default, localstack issue fixed
     - Added docs page, changed type in Firehose, turned logging off, removed 
unused dependencies.
---
 flink-connectors/flink-connector-aws-base/pom.xml  |  21 +++
 .../connector/aws/util/AWSAsyncSinkUtil.java}      | 107 ++++++++-------
 .../aws/testutils/AWSServicesTestUtils.java        | 129 ++++++++++++++++++
 .../aws/testutils/LocalstackContainer.java         |  78 +++++++++++
 .../connector/aws/util/AWSAsyncSinkUtilTest.java}  | 150 ++++++++++++---------
 .../pom.xml                                        |  12 --
 .../KinesisDataStreamsConfigConstants.java}        |   9 +-
 .../kinesis/sink/KinesisDataStreamsSinkWriter.java |  10 +-
 .../src/test/resources/log4j2-test.properties      |   2 +-
 .../pom.xml                                        |  43 +++---
 .../sink/KinesisFirehoseConfigConstants.java}      |  17 ++-
 .../firehose/sink/KinesisFirehoseException.java    |  54 ++++++++
 .../firehose/sink/KinesisFirehoseSink.java         | 115 ++++++++++++++++
 .../firehose/sink/KinesisFirehoseSinkBuilder.java  | 149 ++++++++++++++++++++
 .../sink/KinesisFirehoseSinkElementConverter.java  |  76 +++++++++++
 .../firehose/sink/KinesisFirehoseSinkWriter.java}  | 120 +++++++++--------
 .../src/main/resources/log4j2.properties           |  25 ++++
 .../sink/KinesisFirehoseSinkBuilderTest.java       |  70 ++++++++++
 .../KinesisFirehoseSinkElementConverterTest.java   |  54 ++++++++
 .../firehose/sink/KinesisFirehoseSinkITCase.java   | 124 +++++++++++++++++
 .../firehose/sink/KinesisFirehoseSinkTest.java     |  76 +++++++++++
 .../sink/KinesisFirehoseSinkWriterTest.java        | 107 +++++++++++++++
 .../firehose/sink/examples/SinkIntoFirehose.java   |  74 ++++++++++
 .../sink/testutils/KinesisFirehoseTestUtils.java   |  73 ++++++++++
 .../src/test/resources/log4j2-test.properties      |   2 +-
 flink-connectors/flink-connector-base/pom.xml      |  16 +++
 .../base/sink/writer/AsyncSinkWriterTest.java      | 120 +----------------
 .../base/sink/writer/TestSinkInitContext.java      | 139 +++++++++++++++++++
 .../connectors/kinesis/proxy/KinesisProxyV2.java   |   4 +-
 .../kinesis/proxy/KinesisProxyV2Factory.java       |  18 ++-
 .../streaming/connectors/kinesis/util/AWSUtil.java |  11 +-
 .../connectors/kinesis/util/AwsV2Util.java         |   8 ++
 .../connectors/kinesis/util/AwsV2UtilTest.java     |  34 +++++
 flink-connectors/pom.xml                           |   1 +
 .../org/apache/flink/util/DockerImageVersions.java |   2 +
 tools/ci/stage.sh                                  |   1 +
 36 files changed, 1697 insertions(+), 354 deletions(-)

diff --git a/flink-connectors/flink-connector-aws-base/pom.xml 
b/flink-connectors/flink-connector-aws-base/pom.xml
index b774b2e..870e487 100644
--- a/flink-connectors/flink-connector-aws-base/pom.xml
+++ b/flink-connectors/flink-connector-aws-base/pom.xml
@@ -63,6 +63,27 @@ under the License.
                        <artifactId>sts</artifactId>
                        <version>${aws.sdk.version}</version>
                </dependency>
+
+               <!-- Test dependencies -->
+               <dependency>
+                       <groupId>org.testcontainers</groupId>
+                       <artifactId>testcontainers</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>software.amazon.awssdk</groupId>
+                       <artifactId>s3</artifactId>
+                       <version>${aws.sdk.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>software.amazon.awssdk</groupId>
+                       <artifactId>iam</artifactId>
+                       <version>${aws.sdk.version}</version>
+                       <scope>test</scope>
+               </dependency>
        </dependencies>
 
        <build>
diff --git 
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/util/AWSKinesisDataStreamsUtil.java
 
b/flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtil.java
similarity index 61%
rename from 
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/util/AWSKinesisDataStreamsUtil.java
rename to 
flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtil.java
index 1ec75aa..1256142 100644
--- 
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/util/AWSKinesisDataStreamsUtil.java
+++ 
b/flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtil.java
@@ -15,24 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.kinesis.util;
+package org.apache.flink.connector.aws.util;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.connector.aws.config.AWSConfigConstants;
-import org.apache.flink.connector.aws.util.AWSGeneralUtil;
-import 
org.apache.flink.connector.kinesis.config.AWSKinesisDataStreamsConfigConstants;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
+import software.amazon.awssdk.awscore.client.builder.AwsAsyncClientBuilder;
+import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
+import software.amazon.awssdk.core.SdkClient;
 import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
 import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
 import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
 import software.amazon.awssdk.core.client.config.SdkClientOption;
 import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
-import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
-import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
-import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
-import 
software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
 
 import java.net.URI;
 import java.util.Optional;
@@ -40,11 +37,10 @@ import java.util.Properties;
 
 /** Some utilities specific to Amazon Web Service. */
 @Internal
-public class AWSKinesisDataStreamsUtil extends AWSGeneralUtil {
+public class AWSAsyncSinkUtil extends AWSGeneralUtil {
 
-    /** Used for formatting Flink-specific user agent string when creating 
Kinesis client. */
-    private static final String USER_AGENT_FORMAT =
-            
AWSKinesisDataStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT + " 
V2";
+    /** V2 suffix to denote the unified sinks. V1 sinks are based on KPL etc. 
*/
+    static final String V2_USER_AGENT_SUFFIX = " V2";
 
     /**
      * Creates a user agent prefix for Flink. This can be used by HTTP Clients.
@@ -62,51 +58,61 @@ public class AWSKinesisDataStreamsUtil extends 
AWSGeneralUtil {
 
     /**
      * @param configProps configuration properties
-     * @param httpClient the underlying HTTP client used to talk to Kinesis
-     * @return a new Amazon Kinesis Client
+     * @param httpClient the underlying HTTP client used to talk to AWS
+     * @return a new AWS Client
      */
-    public static KinesisAsyncClient createKinesisAsyncClient(
-            final Properties configProps, final SdkAsyncHttpClient httpClient) 
{
+    public static <
+                    S extends SdkClient,
+                    T extends
+                            AwsAsyncClientBuilder<? extends T, S>
+                                    & AwsClientBuilder<? extends T, S>>
+            S createAwsAsyncClient(
+                    final Properties configProps,
+                    final SdkAsyncHttpClient httpClient,
+                    final T clientBuilder,
+                    final String awsUserAgentPrefixFormat,
+                    final String awsClientUserAgentPrefix) {
         SdkClientConfiguration clientConfiguration = 
SdkClientConfiguration.builder().build();
-        return createKinesisAsyncClient(configProps, clientConfiguration, 
httpClient);
+        return createAwsAsyncClient(
+                configProps,
+                clientConfiguration,
+                httpClient,
+                clientBuilder,
+                awsUserAgentPrefixFormat,
+                awsClientUserAgentPrefix);
     }
 
     /**
      * @param configProps configuration properties
      * @param clientConfiguration the AWS SDK v2 config to instantiate the 
client
-     * @param httpClient the underlying HTTP client used to talk to Kinesis
-     * @return a new Amazon Kinesis Client
+     * @param httpClient the underlying HTTP client used to talk to AWS
+     * @return a new AWS Client
      */
-    public static KinesisAsyncClient createKinesisAsyncClient(
-            final Properties configProps,
-            final SdkClientConfiguration clientConfiguration,
-            final SdkAsyncHttpClient httpClient) {
+    public static <
+                    S extends SdkClient,
+                    T extends
+                            AwsAsyncClientBuilder<? extends T, S>
+                                    & AwsClientBuilder<? extends T, S>>
+            S createAwsAsyncClient(
+                    final Properties configProps,
+                    final SdkClientConfiguration clientConfiguration,
+                    final SdkAsyncHttpClient httpClient,
+                    final T clientBuilder,
+                    final String awsUserAgentPrefixFormat,
+                    final String awsClientUserAgentPrefix) {
         String flinkUserAgentPrefix =
-                Optional.ofNullable(
-                                configProps.getProperty(
-                                        AWSKinesisDataStreamsConfigConstants
-                                                
.KINESIS_CLIENT_USER_AGENT_PREFIX))
-                        .orElse(formatFlinkUserAgentPrefix(USER_AGENT_FORMAT));
+                
Optional.ofNullable(configProps.getProperty(awsClientUserAgentPrefix))
+                        .orElse(
+                                formatFlinkUserAgentPrefix(
+                                        awsUserAgentPrefixFormat + 
V2_USER_AGENT_SUFFIX));
 
         final ClientOverrideConfiguration overrideConfiguration =
                 createClientOverrideConfiguration(
                         clientConfiguration,
                         ClientOverrideConfiguration.builder(),
                         flinkUserAgentPrefix);
-        final KinesisAsyncClientBuilder clientBuilder = 
KinesisAsyncClient.builder();
 
-        return createKinesisAsyncClient(
-                configProps, clientBuilder, httpClient, overrideConfiguration);
-    }
-
-    @VisibleForTesting
-    static ClientOverrideConfiguration createClientOverrideConfiguration(
-            final SdkClientConfiguration config,
-            final ClientOverrideConfiguration.Builder 
overrideConfigurationBuilder) {
-        return createClientOverrideConfiguration(
-                config,
-                overrideConfigurationBuilder,
-                formatFlinkUserAgentPrefix(USER_AGENT_FORMAT));
+        return createAwsAsyncClient(configProps, clientBuilder, httpClient, 
overrideConfiguration);
     }
 
     @VisibleForTesting
@@ -131,11 +137,16 @@ public class AWSKinesisDataStreamsUtil extends 
AWSGeneralUtil {
     }
 
     @VisibleForTesting
-    static KinesisAsyncClient createKinesisAsyncClient(
-            final Properties configProps,
-            final KinesisAsyncClientBuilder clientBuilder,
-            final SdkAsyncHttpClient httpClient,
-            final ClientOverrideConfiguration overrideConfiguration) {
+    static <
+                    S extends SdkClient,
+                    T extends
+                            AwsAsyncClientBuilder<? extends T, S>
+                                    & AwsClientBuilder<? extends T, S>>
+            S createAwsAsyncClient(
+                    final Properties configProps,
+                    final T clientBuilder,
+                    final SdkAsyncHttpClient httpClient,
+                    final ClientOverrideConfiguration overrideConfiguration) {
 
         if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
             final URI endpointOverride =
@@ -150,10 +161,4 @@ public class AWSKinesisDataStreamsUtil extends 
AWSGeneralUtil {
                 .region(getRegion(configProps))
                 .build();
     }
-
-    public static boolean isRecoverableException(Exception e) {
-        Throwable cause = e.getCause();
-        return cause instanceof LimitExceededException
-                || cause instanceof ProvisionedThroughputExceededException;
-    }
 }
diff --git 
a/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
 
b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
new file mode 100644
index 0000000..543d81b
--- /dev/null
+++ 
b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.testutils;
+
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.waiters.WaiterResponse;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.iam.IamAsyncClient;
+import software.amazon.awssdk.services.iam.model.CreateRoleRequest;
+import software.amazon.awssdk.services.iam.model.CreateRoleResponse;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
+import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
+import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
+import software.amazon.awssdk.services.s3.model.S3Object;
+import software.amazon.awssdk.services.s3.waiters.S3AsyncWaiter;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES;
+
+/**
+ * A set of static methods that can be used to call common AWS services on the 
Localstack container.
+ */
+public class AWSServicesTestUtils {
+
+    private static final String ACCESS_KEY_ID = "accessKeyId";
+    private static final String SECRET_ACCESS_KEY = "secretAccessKey";
+
+    public static S3AsyncClient getS3Client(String endpoint) throws 
URISyntaxException {
+        return S3AsyncClient.builder()
+                .httpClient(getHttpClient(endpoint))
+                .region(Region.AP_SOUTHEAST_1)
+                .endpointOverride(new URI(endpoint))
+                .credentialsProvider(getDefaultCredentials())
+                .build();
+    }
+
+    public static IamAsyncClient getIamClient(String endpoint) throws 
URISyntaxException {
+        return IamAsyncClient.builder()
+                .httpClient(getHttpClient(endpoint))
+                .region(Region.AWS_GLOBAL)
+                .endpointOverride(new URI(endpoint))
+                .credentialsProvider(getDefaultCredentials())
+                .build();
+    }
+
+    public static AwsCredentialsProvider getDefaultCredentials() {
+        return StaticCredentialsProvider.create(
+                AwsBasicCredentials.create(ACCESS_KEY_ID, SECRET_ACCESS_KEY));
+    }
+
+    public static Properties getConfig(String endpoint) {
+        Properties config = new Properties();
+        config.setProperty(AWS_REGION, Region.AP_SOUTHEAST_1.toString());
+        config.setProperty(AWS_ENDPOINT, endpoint);
+        
config.setProperty(AWSConfigConstants.accessKeyId(AWS_CREDENTIALS_PROVIDER), 
ACCESS_KEY_ID);
+        config.setProperty(
+                AWSConfigConstants.secretKey(AWS_CREDENTIALS_PROVIDER), 
SECRET_ACCESS_KEY);
+        config.setProperty(TRUST_ALL_CERTIFICATES, "true");
+        return config;
+    }
+
+    public static SdkAsyncHttpClient getHttpClient(String endpoint) {
+        return AWSGeneralUtil.createAsyncHttpClient(getConfig(endpoint));
+    }
+
+    public static void createBucket(S3AsyncClient s3Client, String bucketName)
+            throws ExecutionException, InterruptedException {
+        CreateBucketRequest bucketRequest =
+                CreateBucketRequest.builder().bucket(bucketName).build();
+        s3Client.createBucket(bucketRequest);
+
+        HeadBucketRequest bucketRequestWait =
+                HeadBucketRequest.builder().bucket(bucketName).build();
+
+        S3AsyncWaiter s3Waiter = s3Client.waiter();
+        CompletableFuture<WaiterResponse<HeadBucketResponse>> 
waiterResponseFuture =
+                s3Waiter.waitUntilBucketExists(bucketRequestWait);
+
+        waiterResponseFuture.get();
+    }
+
+    public static void createIAMRole(IamAsyncClient iam, String roleName)
+            throws ExecutionException, InterruptedException {
+        CreateRoleRequest request = 
CreateRoleRequest.builder().roleName(roleName).build();
+
+        CompletableFuture<CreateRoleResponse> responseFuture = 
iam.createRole(request);
+        responseFuture.get();
+    }
+
+    public static List<S3Object> listBucketObjects(S3AsyncClient s3, String 
bucketName)
+            throws ExecutionException, InterruptedException {
+        ListObjectsRequest listObjects = 
ListObjectsRequest.builder().bucket(bucketName).build();
+        CompletableFuture<ListObjectsResponse> res = 
s3.listObjects(listObjects);
+        return res.get().contents();
+    }
+}
diff --git 
a/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/LocalstackContainer.java
 
b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/LocalstackContainer.java
new file mode 100644
index 0000000..bdc0517
--- /dev/null
+++ 
b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/LocalstackContainer.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.testutils;
+
+import org.rnorth.ducttape.ratelimits.RateLimiter;
+import org.rnorth.ducttape.ratelimits.RateLimiterBuilder;
+import org.rnorth.ducttape.unreliables.Unreliables;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.AbstractWaitStrategy;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * A class wrapping the Localstack container that provides mock 
implementations of many common AWS
+ * services.
+ */
+public class LocalstackContainer extends GenericContainer<LocalstackContainer> 
{
+
+    private static final int CONTAINER_PORT = 4566;
+
+    public LocalstackContainer(DockerImageName imageName) {
+        super(imageName);
+        withExposedPorts(CONTAINER_PORT);
+        waitingFor(new ListBucketObjectsWaitStrategy());
+    }
+
+    public String getEndpoint() {
+        return String.format("https://%s:%s";, getHost(), 
getMappedPort(CONTAINER_PORT));
+    }
+
+    private class ListBucketObjectsWaitStrategy extends AbstractWaitStrategy {
+        private static final int TRANSACTIONS_PER_SECOND = 1;
+
+        private final RateLimiter rateLimiter =
+                RateLimiterBuilder.newBuilder()
+                        .withRate(TRANSACTIONS_PER_SECOND, SECONDS)
+                        .withConstantThroughput()
+                        .build();
+
+        @Override
+        protected void waitUntilReady() {
+            Unreliables.retryUntilSuccess(
+                    (int) startupTimeout.getSeconds(),
+                    SECONDS,
+                    () -> rateLimiter.getWhenReady(this::list));
+        }
+
+        private List<S3Object> list()
+                throws ExecutionException, InterruptedException, 
URISyntaxException {
+            String bucketName = "bucket-name-not-to-be-used";
+            S3AsyncClient client = 
AWSServicesTestUtils.getS3Client(getEndpoint());
+            AWSServicesTestUtils.createBucket(client, bucketName);
+            return AWSServicesTestUtils.listBucketObjects(client, bucketName);
+        }
+    }
+}
diff --git 
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/util/AWSKinesisDataStreamsUtilTest.java
 
b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtilTest.java
similarity index 62%
rename from 
flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/util/AWSKinesisDataStreamsUtilTest.java
rename to 
flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtilTest.java
index 0d6eae8..ccecdb8 100644
--- 
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/util/AWSKinesisDataStreamsUtilTest.java
+++ 
b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtilTest.java
@@ -15,13 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.kinesis.util;
-
-import org.apache.flink.connector.aws.util.TestUtil;
-import 
org.apache.flink.connector.kinesis.config.AWSKinesisDataStreamsConfigConstants;
+package org.apache.flink.connector.aws.util;
 
 import org.junit.Test;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.awscore.client.builder.AwsAsyncClientBuilder;
+import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
+import software.amazon.awssdk.core.SdkClient;
+import software.amazon.awssdk.core.client.config.ClientAsyncConfiguration;
 import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
 import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
 import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
@@ -29,18 +31,14 @@ import 
software.amazon.awssdk.core.client.config.SdkClientOption;
 import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
 import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
 import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
-import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
 
 import java.net.URI;
 import java.time.Duration;
 import java.util.Properties;
-import java.util.concurrent.ExecutionException;
 
 import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
 import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static 
org.apache.flink.connector.aws.util.AWSAsyncSinkUtil.formatFlinkUserAgentPrefix;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.Mockito.mock;
@@ -48,20 +46,23 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-/** Tests for {@link AWSKinesisDataStreamsUtil}. */
-public class AWSKinesisDataStreamsUtilTest {
+/** Tests for {@link AWSAsyncSinkUtil}. */
+public class AWSAsyncSinkUtilTest {
+
     private static final String DEFAULT_USER_AGENT_PREFIX_FORMAT =
-            
AWSKinesisDataStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT + " 
V2";
+            "Apache Flink %s (%s) *Destination* Connector";
+    private static final String DEFAULT_USER_AGENT_PREFIX_FORMAT_V2 =
+            "Apache Flink %s (%s) *Destination* Connector V2";
 
     @Test
     public void testCreateKinesisAsyncClient() {
         Properties properties = TestUtil.properties(AWS_REGION, "eu-west-2");
-        KinesisAsyncClientBuilder builder = mockKinesisAsyncClientBuilder();
+        MockAsyncClientBuilder builder = mockKinesisAsyncClientBuilder();
         ClientOverrideConfiguration clientOverrideConfiguration =
                 ClientOverrideConfiguration.builder().build();
         SdkAsyncHttpClient httpClient = 
NettyNioAsyncHttpClient.builder().build();
 
-        AWSKinesisDataStreamsUtil.createKinesisAsyncClient(
+        AWSAsyncSinkUtil.createAwsAsyncClient(
                 properties, builder, httpClient, clientOverrideConfiguration);
 
         verify(builder).overrideConfiguration(clientOverrideConfiguration);
@@ -77,12 +78,12 @@ public class AWSKinesisDataStreamsUtilTest {
         Properties properties = TestUtil.properties(AWS_REGION, "eu-west-2");
         properties.setProperty(AWS_ENDPOINT, "https://localhost";);
 
-        KinesisAsyncClientBuilder builder = mockKinesisAsyncClientBuilder();
+        MockAsyncClientBuilder builder = mockKinesisAsyncClientBuilder();
         ClientOverrideConfiguration clientOverrideConfiguration =
                 ClientOverrideConfiguration.builder().build();
         SdkAsyncHttpClient httpClient = 
NettyNioAsyncHttpClient.builder().build();
 
-        AWSKinesisDataStreamsUtil.createKinesisAsyncClient(
+        AWSAsyncSinkUtil.createAwsAsyncClient(
                 properties, builder, httpClient, clientOverrideConfiguration);
 
         verify(builder).endpointOverride(URI.create("https://localhost";));
@@ -94,14 +95,17 @@ public class AWSKinesisDataStreamsUtilTest {
 
         ClientOverrideConfiguration.Builder builder = 
mockClientOverrideConfigurationBuilder();
 
-        
AWSKinesisDataStreamsUtil.createClientOverrideConfiguration(clientConfiguration,
 builder);
+        AWSAsyncSinkUtil.createClientOverrideConfiguration(
+                clientConfiguration,
+                builder,
+                formatFlinkUserAgentPrefix(
+                        DEFAULT_USER_AGENT_PREFIX_FORMAT + 
AWSAsyncSinkUtil.V2_USER_AGENT_SUFFIX));
 
         verify(builder).build();
         verify(builder)
                 .putAdvancedOption(
                         SdkAdvancedClientOption.USER_AGENT_PREFIX,
-                        AWSKinesisDataStreamsUtil.formatFlinkUserAgentPrefix(
-                                DEFAULT_USER_AGENT_PREFIX_FORMAT));
+                        
formatFlinkUserAgentPrefix(DEFAULT_USER_AGENT_PREFIX_FORMAT_V2));
         
verify(builder).putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_SUFFIX, 
null);
         verify(builder, never()).apiCallAttemptTimeout(any());
         verify(builder, never()).apiCallTimeout(any());
@@ -116,7 +120,11 @@ public class AWSKinesisDataStreamsUtilTest {
 
         ClientOverrideConfiguration.Builder builder = 
mockClientOverrideConfigurationBuilder();
 
-        
AWSKinesisDataStreamsUtil.createClientOverrideConfiguration(clientConfiguration,
 builder);
+        AWSAsyncSinkUtil.createClientOverrideConfiguration(
+                clientConfiguration,
+                builder,
+                formatFlinkUserAgentPrefix(
+                        DEFAULT_USER_AGENT_PREFIX_FORMAT + 
AWSAsyncSinkUtil.V2_USER_AGENT_SUFFIX));
 
         
verify(builder).putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_SUFFIX, 
"suffix");
     }
@@ -130,7 +138,12 @@ public class AWSKinesisDataStreamsUtilTest {
 
         ClientOverrideConfiguration.Builder builder = 
mockClientOverrideConfigurationBuilder();
 
-        
AWSKinesisDataStreamsUtil.createClientOverrideConfiguration(clientConfiguration,
 builder);
+        AWSAsyncSinkUtil.createClientOverrideConfiguration(
+                clientConfiguration,
+                builder,
+                formatFlinkUserAgentPrefix(
+                        DEFAULT_USER_AGENT_PREFIX_FORMAT_V2
+                                + AWSAsyncSinkUtil.V2_USER_AGENT_SUFFIX));
 
         verify(builder).apiCallAttemptTimeout(Duration.ofMillis(500));
     }
@@ -144,49 +157,18 @@ public class AWSKinesisDataStreamsUtilTest {
 
         ClientOverrideConfiguration.Builder builder = 
mockClientOverrideConfigurationBuilder();
 
-        
AWSKinesisDataStreamsUtil.createClientOverrideConfiguration(clientConfiguration,
 builder);
+        AWSAsyncSinkUtil.createClientOverrideConfiguration(
+                clientConfiguration,
+                builder,
+                formatFlinkUserAgentPrefix(
+                        DEFAULT_USER_AGENT_PREFIX_FORMAT_V2
+                                + AWSAsyncSinkUtil.V2_USER_AGENT_SUFFIX));
 
         verify(builder).apiCallTimeout(Duration.ofMillis(600));
     }
 
-    @Test
-    public void testIsRecoverableExceptionForRecoverable() {
-        Exception recoverable = LimitExceededException.builder().build();
-        assertTrue(
-                AWSKinesisDataStreamsUtil.isRecoverableException(
-                        new ExecutionException(recoverable)));
-    }
-
-    @Test
-    public void testIsRecoverableExceptionForNonRecoverable() {
-        Exception nonRecoverable = new IllegalArgumentException("abc");
-        assertFalse(
-                AWSKinesisDataStreamsUtil.isRecoverableException(
-                        new ExecutionException(nonRecoverable)));
-    }
-
-    @Test
-    public void 
testIsRecoverableExceptionForRuntimeExceptionWrappingRecoverable() {
-        Exception recoverable = LimitExceededException.builder().build();
-        Exception runtime = new RuntimeException("abc", recoverable);
-        assertTrue(AWSKinesisDataStreamsUtil.isRecoverableException(runtime));
-    }
-
-    @Test
-    public void 
testIsRecoverableExceptionForRuntimeExceptionWrappingNonRecoverable() {
-        Exception nonRecoverable = new IllegalArgumentException("abc");
-        Exception runtime = new RuntimeException("abc", nonRecoverable);
-        assertFalse(AWSKinesisDataStreamsUtil.isRecoverableException(runtime));
-    }
-
-    @Test
-    public void testIsRecoverableExceptionForNullCause() {
-        Exception nonRecoverable = new IllegalArgumentException("abc");
-        
assertFalse(AWSKinesisDataStreamsUtil.isRecoverableException(nonRecoverable));
-    }
-
-    private KinesisAsyncClientBuilder mockKinesisAsyncClientBuilder() {
-        KinesisAsyncClientBuilder builder = 
mock(KinesisAsyncClientBuilder.class);
+    private MockAsyncClientBuilder mockKinesisAsyncClientBuilder() {
+        MockAsyncClientBuilder builder = mock(MockAsyncClientBuilder.class);
         
when(builder.overrideConfiguration(any(ClientOverrideConfiguration.class)))
                 .thenReturn(builder);
         when(builder.httpClient(any())).thenReturn(builder);
@@ -205,4 +187,52 @@ public class AWSKinesisDataStreamsUtilTest {
 
         return builder;
     }
+
+    private static class MockAsyncClientBuilder
+            implements AwsAsyncClientBuilder<MockAsyncClientBuilder, 
SdkClient>,
+                    AwsClientBuilder<MockAsyncClientBuilder, SdkClient> {
+
+        @Override
+        public MockAsyncClientBuilder asyncConfiguration(
+                ClientAsyncConfiguration clientAsyncConfiguration) {
+            return null;
+        }
+
+        @Override
+        public MockAsyncClientBuilder httpClient(SdkAsyncHttpClient 
sdkAsyncHttpClient) {
+            return null;
+        }
+
+        @Override
+        public MockAsyncClientBuilder 
httpClientBuilder(SdkAsyncHttpClient.Builder builder) {
+            return null;
+        }
+
+        @Override
+        public MockAsyncClientBuilder credentialsProvider(
+                AwsCredentialsProvider awsCredentialsProvider) {
+            return null;
+        }
+
+        @Override
+        public MockAsyncClientBuilder region(Region region) {
+            return null;
+        }
+
+        @Override
+        public MockAsyncClientBuilder overrideConfiguration(
+                ClientOverrideConfiguration clientOverrideConfiguration) {
+            return null;
+        }
+
+        @Override
+        public MockAsyncClientBuilder endpointOverride(URI uri) {
+            return null;
+        }
+
+        @Override
+        public SdkClient build() {
+            return null;
+        }
+    }
 }
diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/pom.xml 
b/flink-connectors/flink-connector-aws-kinesis-data-streams/pom.xml
index 235fc3c..be50a2e 100644
--- a/flink-connectors/flink-connector-aws-kinesis-data-streams/pom.xml
+++ b/flink-connectors/flink-connector-aws-kinesis-data-streams/pom.xml
@@ -70,18 +70,6 @@ under the License.
                        <version>${aws.sdk.version}</version>
                </dependency>
 
-               <dependency>
-                       <groupId>software.amazon.awssdk</groupId>
-                       <artifactId>netty-nio-client</artifactId>
-                       <version>${aws.sdk.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>software.amazon.awssdk</groupId>
-                       <artifactId>sts</artifactId>
-                       <version>${aws.sdk.version}</version>
-               </dependency>
-
                <!--Table API dependencies-->
                <dependency>
                        <groupId>org.apache.flink</groupId>
diff --git 
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/config/AWSKinesisDataStreamsConfigConstants.java
 
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsConfigConstants.java
similarity index 80%
copy from 
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/config/AWSKinesisDataStreamsConfigConstants.java
copy to 
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsConfigConstants.java
index b318292..a5a6020 100644
--- 
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/config/AWSKinesisDataStreamsConfigConstants.java
+++ 
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsConfigConstants.java
@@ -15,19 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.kinesis.config;
+package org.apache.flink.connector.kinesis.sink;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.connector.kinesis.util.AWSKinesisDataStreamsUtil;
 
-/** Defaults for {@link AWSKinesisDataStreamsUtil}. */
+/** Defaults for {@link KinesisDataStreamsSinkWriter}. */
 @PublicEvolving
-public class AWSKinesisDataStreamsConfigConstants {
+public class KinesisDataStreamsConfigConstants {
 
     public static final String BASE_KINESIS_USER_AGENT_PREFIX_FORMAT =
             "Apache Flink %s (%s) Kinesis Connector";
 
-    /** Identifier for user agent prefix. */
+    /** Kinesis identifier for user agent prefix. */
     public static final String KINESIS_CLIENT_USER_AGENT_PREFIX =
             "aws.kinesis.client.user-agent-prefix";
 }
diff --git 
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
 
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
index b720e5e..b291c12 100644
--- 
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
+++ 
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
@@ -18,10 +18,10 @@
 package org.apache.flink.connector.kinesis.sink;
 
 import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil;
 import org.apache.flink.connector.aws.util.AWSGeneralUtil;
 import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
-import org.apache.flink.connector.kinesis.util.AWSKinesisDataStreamsUtil;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 
@@ -104,8 +104,12 @@ class KinesisDataStreamsSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, PutRe
         final SdkAsyncHttpClient httpClient =
                 AWSGeneralUtil.createAsyncHttpClient(kinesisClientProperties);
 
-        return AWSKinesisDataStreamsUtil.createKinesisAsyncClient(
-                kinesisClientProperties, httpClient);
+        return AWSAsyncSinkUtil.createAwsAsyncClient(
+                kinesisClientProperties,
+                httpClient,
+                KinesisAsyncClient.builder(),
+                
KinesisDataStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT,
+                
KinesisDataStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX);
     }
 
     @Override
diff --git 
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/log4j2-test.properties
 
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/log4j2-test.properties
index c5339e7..c4fa187 100644
--- 
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/log4j2-test.properties
+++ 
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/log4j2-test.properties
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-rootLogger.level = INFO
+rootLogger.level = OFF
 rootLogger.appenderRef.test.ref = TestLogger
 
 appender.testlogger.name = TestLogger
diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/pom.xml 
b/flink-connectors/flink-connector-aws-kinesis-firehose/pom.xml
similarity index 83%
copy from flink-connectors/flink-connector-aws-kinesis-data-streams/pom.xml
copy to flink-connectors/flink-connector-aws-kinesis-firehose/pom.xml
index 235fc3c..d391616 100644
--- a/flink-connectors/flink-connector-aws-kinesis-data-streams/pom.xml
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/pom.xml
@@ -30,8 +30,8 @@ under the License.
                <relativePath>..</relativePath>
        </parent>
 
-       <artifactId>flink-connector-aws-kinesis-data-streams</artifactId>
-       <name>Flink : Connectors : AWS Kinesis Data Streams</name>
+       <artifactId>flink-connector-aws-kinesis-firehose</artifactId>
+       <name>Flink : Connectors : AWS Kinesis Data Firehose</name>
        <properties>
                <aws.sdk.version>2.17.52</aws.sdk.version>
        </properties>
@@ -66,7 +66,7 @@ under the License.
 
                <dependency>
                        <groupId>software.amazon.awssdk</groupId>
-                       <artifactId>kinesis</artifactId>
+                       <artifactId>firehose</artifactId>
                        <version>${aws.sdk.version}</version>
                </dependency>
 
@@ -76,19 +76,6 @@ under the License.
                        <version>${aws.sdk.version}</version>
                </dependency>
 
-               <dependency>
-                       <groupId>software.amazon.awssdk</groupId>
-                       <artifactId>sts</artifactId>
-                       <version>${aws.sdk.version}</version>
-               </dependency>
-
-               <!--Table API dependencies-->
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-table-common</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-
                <!-- Test dependencies -->
                <dependency>
                        <groupId>org.apache.flink</groupId>
@@ -105,28 +92,34 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
-               <!-- Kinesis table factory testing -->
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-table-common</artifactId>
+                       <artifactId>flink-connector-base</artifactId>
                        <version>${project.version}</version>
                        <type>test-jar</type>
                        <scope>test</scope>
                </dependency>
+
                <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
-                       <version>${project.version}</version>
-                       <type>test-jar</type>
+                       <groupId>org.testcontainers</groupId>
+                       <artifactId>testcontainers</artifactId>
                        <scope>test</scope>
                </dependency>
 
                <dependency>
-                       <groupId>org.testcontainers</groupId>
-                       <artifactId>testcontainers</artifactId>
+                       <groupId>software.amazon.awssdk</groupId>
+                       <artifactId>s3</artifactId>
+                       <version>${aws.sdk.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>software.amazon.awssdk</groupId>
+                       <artifactId>iam</artifactId>
+                       <version>${aws.sdk.version}</version>
                        <scope>test</scope>
                </dependency>
-    </dependencies>
+       </dependencies>
 
        <build>
                <plugins>
diff --git 
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/config/AWSKinesisDataStreamsConfigConstants.java
 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseConfigConstants.java
similarity index 62%
rename from 
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/config/AWSKinesisDataStreamsConfigConstants.java
rename to 
flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseConfigConstants.java
index b318292..527f74a 100644
--- 
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/config/AWSKinesisDataStreamsConfigConstants.java
+++ 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseConfigConstants.java
@@ -15,19 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.kinesis.config;
+package org.apache.flink.connector.firehose.sink;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.connector.kinesis.util.AWSKinesisDataStreamsUtil;
 
-/** Defaults for {@link AWSKinesisDataStreamsUtil}. */
+/** Defaults for {@link KinesisFirehoseSinkWriter}. */
 @PublicEvolving
-public class AWSKinesisDataStreamsConfigConstants {
+public class KinesisFirehoseConfigConstants {
 
-    public static final String BASE_KINESIS_USER_AGENT_PREFIX_FORMAT =
-            "Apache Flink %s (%s) Kinesis Connector";
+    public static final String BASE_FIREHOSE_USER_AGENT_PREFIX_FORMAT =
+            "Apache Flink %s (%s) Firehose Connector";
 
-    /** Identifier for user agent prefix. */
-    public static final String KINESIS_CLIENT_USER_AGENT_PREFIX =
-            "aws.kinesis.client.user-agent-prefix";
+    /** Firehose identifier for user agent prefix. */
+    public static final String FIREHOSE_CLIENT_USER_AGENT_PREFIX =
+            "aws.firehose.client.user-agent-prefix";
 }
diff --git 
a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseException.java
 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseException.java
new file mode 100644
index 0000000..ff6918c
--- /dev/null
+++ 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseException.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * A {@link RuntimeException} wrapper indicating the exception was thrown from 
the Kinesis Data
+ * Firehose Sink.
+ */
+@PublicEvolving
+class KinesisFirehoseException extends RuntimeException {
+
+    public KinesisFirehoseException(final String message) {
+        super(message);
+    }
+
+    public KinesisFirehoseException(final String message, final Throwable 
cause) {
+        super(message, cause);
+    }
+
+    /**
+     * When the flag {@code failOnError} is set in {@link 
KinesisFirehoseSinkWriter}, this exception
+     * is raised as soon as any exception occurs when writing to KDF.
+     */
+    static class KinesisFirehoseFailFastException extends 
KinesisFirehoseException {
+
+        public KinesisFirehoseFailFastException() {
+            super(
+                    "Encountered an exception while persisting records, not 
retrying due to {failOnError} being set.");
+        }
+
+        public KinesisFirehoseFailFastException(final Throwable cause) {
+            super(
+                    "Encountered an exception while persisting records, not 
retrying due to {failOnError} being set.",
+                    cause);
+        }
+    }
+}
diff --git 
a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSink.java
 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSink.java
new file mode 100644
index 0000000..50f04d4
--- /dev/null
+++ 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSink.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.base.sink.AsyncSinkBase;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * A Kinesis Data Firehose (KDF) Sink that performs async requests against a 
destination delivery
+ * stream using the buffering protocol specified in {@link AsyncSinkBase}.
+ *
+ * <p>The sink internally uses a {@link
+ * software.amazon.awssdk.services.firehose.FirehoseAsyncClient} to 
communicate with the AWS
+ * endpoint.
+ *
+ * <p>Please see the writer implementation in {@link KinesisFirehoseSinkWriter}
+ *
+ * @param <InputT> Type of the elements handled by this sink
+ */
+@PublicEvolving
+public class KinesisFirehoseSink<InputT> extends AsyncSinkBase<InputT, Record> 
{
+
+    private final boolean failOnError;
+    private final String deliveryStreamName;
+    private final Properties firehoseClientProperties;
+
+    KinesisFirehoseSink(
+            ElementConverter<InputT, Record> elementConverter,
+            Integer maxBatchSize,
+            Integer maxInFlightRequests,
+            Integer maxBufferedRequests,
+            Long maxBatchSizeInBytes,
+            Long maxTimeInBufferMS,
+            Long maxRecordSizeInBytes,
+            boolean failOnError,
+            String deliveryStreamName,
+            Properties firehoseClientProperties) {
+        super(
+                elementConverter,
+                maxBatchSize,
+                maxInFlightRequests,
+                maxBufferedRequests,
+                maxBatchSizeInBytes,
+                maxTimeInBufferMS,
+                maxRecordSizeInBytes);
+        this.deliveryStreamName =
+                Preconditions.checkNotNull(
+                        deliveryStreamName,
+                        "The delivery stream name must not be null when 
initializing the KDF Sink.");
+        Preconditions.checkArgument(
+                !this.deliveryStreamName.isEmpty(),
+                "The delivery stream name must be set when initializing the 
KDF Sink.");
+        this.failOnError = failOnError;
+        this.firehoseClientProperties = firehoseClientProperties;
+    }
+
+    /**
+     * Create a {@link KinesisFirehoseSinkBuilder} to allow the fluent 
construction of a new {@code
+     * KinesisFirehoseSink}.
+     *
+     * @param <InputT> type of incoming records
+     * @return {@link KinesisFirehoseSinkBuilder}
+     */
+    public static <InputT> KinesisFirehoseSinkBuilder<InputT> builder() {
+        return new KinesisFirehoseSinkBuilder<>();
+    }
+
+    @Override
+    public SinkWriter<InputT, Void, Collection<Record>> createWriter(
+            InitContext context, List<Collection<Record>> states) {
+        return new KinesisFirehoseSinkWriter<>(
+                getElementConverter(),
+                context,
+                getMaxBatchSize(),
+                getMaxInFlightRequests(),
+                getMaxBufferedRequests(),
+                getMaxBatchSizeInBytes(),
+                getMaxTimeInBufferMS(),
+                getMaxRecordSizeInBytes(),
+                failOnError,
+                deliveryStreamName,
+                firehoseClientProperties);
+    }
+
+    @Override
+    public Optional<SimpleVersionedSerializer<Collection<Record>>> 
getWriterStateSerializer() {
+        return Optional.empty();
+    }
+}
diff --git 
a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java
 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java
new file mode 100644
index 0000000..ee22e1f
--- /dev/null
+++ 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
+
+import software.amazon.awssdk.http.Protocol;
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION;
+import static software.amazon.awssdk.http.Protocol.HTTP1_1;
+
+/**
+ * Builder to construct {@link KinesisFirehoseSink}.
+ *
+ * <p>The following example shows the minimum setup to create a {@link 
KinesisFirehoseSink} that
+ * writes String values to a Kinesis Data Firehose delivery stream named 
delivery-stream-name.
+ *
+ * <pre>{@code
+ * private static final KinesisFirehoseSinkElementConverter<String> 
elementConverter =
+ *         KinesisFirehoseSinkElementConverter.<String>builder()
+ *                 .setSerializationSchema(new SimpleStringSchema())
+ *                 .build();
+ *
+ * Properties sinkProperties = new Properties();
+ * sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+ *
+ * KinesisFirehoseSink<String> kdfSink =
+ *         KinesisFirehoseSink.<String>builder()
+ *                 .setElementConverter(elementConverter)
+ *                 .setDeliveryStreamName("delivery-stream-name")
+ *                 .setMaxBatchSize(20)
+ *                 .setFirehoseClientProperties(sinkProperties)
+ *                 .build();
+ * }</pre>
+ *
+ * <p>If the following parameters are not set in this builder, the following 
defaults will be used:
+ *
+ * <ul>
+ *   <li>{@code maxBatchSize} will be 500
+ *   <li>{@code maxInFlightRequests} will be 50
+ *   <li>{@code maxBufferedRequests} will be 10000
+ *   <li>{@code maxBatchSizeInBytes} will be 4 MB i.e. {@code 4 * 1024 * 1024}
+ *   <li>{@code maxTimeInBufferMS} will be 5000ms
+ *   <li>{@code maxRecordSizeInBytes} will be 1000 KB i.e. {@code 1000 * 1024}
+ *   <li>{@code failOnError} will be false
+ * </ul>
+ *
+ * @param <InputT> type of elements that should be persisted in the destination
+ */
+@PublicEvolving
+public class KinesisFirehoseSinkBuilder<InputT>
+        extends AsyncSinkBaseBuilder<InputT, Record, 
KinesisFirehoseSinkBuilder<InputT>> {
+
+    private static final int DEFAULT_MAX_BATCH_SIZE = 500;
+    private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50;
+    private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 10000;
+    private static final long DEFAULT_MAX_BATCH_SIZE_IN_B = 4 * 1024 * 1024;
+    private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000;
+    private static final long DEFAULT_MAX_RECORD_SIZE_IN_B = 1000 * 1024;
+    private static final boolean DEFAULT_FAIL_ON_ERROR = false;
+    private static final Protocol DEFAULT_HTTP_PROTOCOL = HTTP1_1;
+
+    private Boolean failOnError;
+    private String deliveryStreamName;
+    private Properties firehoseClientProperties;
+
+    KinesisFirehoseSinkBuilder() {}
+
+    /**
+     * Sets the name of the KDF delivery stream that the sink will connect to. 
There is no default
+     * for this parameter, therefore, this must be provided at sink creation 
time otherwise the
+     * build will fail.
+     *
+     * @param deliveryStreamName the name of the delivery stream
+     * @return {@link KinesisFirehoseSinkBuilder} itself
+     */
+    public KinesisFirehoseSinkBuilder<InputT> setDeliveryStreamName(String 
deliveryStreamName) {
+        this.deliveryStreamName = deliveryStreamName;
+        return this;
+    }
+
+    /**
+     * If writing to Kinesis Data Firehose results in a partial or full 
failure being returned, the
+     * job will fail immediately with a {@link KinesisFirehoseException} if 
failOnError is set.
+     *
+     * @param failOnError whether to fail on error
+     * @return {@link KinesisFirehoseSinkBuilder} itself
+     */
+    public KinesisFirehoseSinkBuilder<InputT> setFailOnError(boolean 
failOnError) {
+        this.failOnError = failOnError;
+        return this;
+    }
+
+    /**
+     * A set of properties used by the sink to create the firehose client. 
This may be used to set
+     * the aws region, credentials etc. See the docs for usage and syntax.
+     *
+     * @param firehoseClientProperties Firehose client properties
+     * @return {@link KinesisFirehoseSinkBuilder} itself
+     */
+    public KinesisFirehoseSinkBuilder<InputT> setFirehoseClientProperties(
+            Properties firehoseClientProperties) {
+        this.firehoseClientProperties = firehoseClientProperties;
+        return this;
+    }
+
+    private Properties getClientPropertiesWithDefaultHttpProtocol() {
+        Properties clientProperties =
+                Optional.ofNullable(firehoseClientProperties).orElse(new 
Properties());
+        clientProperties.putIfAbsent(HTTP_PROTOCOL_VERSION, 
DEFAULT_HTTP_PROTOCOL);
+        return clientProperties;
+    }
+
+    @Override
+    public KinesisFirehoseSink<InputT> build() {
+        return new KinesisFirehoseSink<>(
+                getElementConverter(),
+                
Optional.ofNullable(getMaxBatchSize()).orElse(DEFAULT_MAX_BATCH_SIZE),
+                Optional.ofNullable(getMaxInFlightRequests())
+                        .orElse(DEFAULT_MAX_IN_FLIGHT_REQUESTS),
+                
Optional.ofNullable(getMaxBufferedRequests()).orElse(DEFAULT_MAX_BUFFERED_REQUESTS),
+                
Optional.ofNullable(getMaxBatchSizeInBytes()).orElse(DEFAULT_MAX_BATCH_SIZE_IN_B),
+                
Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS),
+                
Optional.ofNullable(getMaxRecordSizeInBytes()).orElse(DEFAULT_MAX_RECORD_SIZE_IN_B),
+                Optional.ofNullable(failOnError).orElse(DEFAULT_FAIL_ON_ERROR),
+                deliveryStreamName,
+                getClientPropertiesWithDefaultHttpProtocol());
+    }
+}
diff --git 
a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
new file mode 100644
index 0000000..45b4186
--- /dev/null
+++ 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.util.Preconditions;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.firehose.model.Record;
+
+/**
+ * An implementation of the {@link ElementConverter} that uses the AWS Kinesis 
SDK v2. The user only
+ * needs to provide a {@link SerializationSchema} of the {@code InputT} to 
transform it into a
+ * {@link Record} that may be persisted.
+ */
+@PublicEvolving
+public class KinesisFirehoseSinkElementConverter<InputT>
+        implements ElementConverter<InputT, Record> {
+
+    /** A serialization schema to specify how the input element should be 
serialized. */
+    private final SerializationSchema<InputT> serializationSchema;
+
+    private KinesisFirehoseSinkElementConverter(SerializationSchema<InputT> 
serializationSchema) {
+        this.serializationSchema = serializationSchema;
+    }
+
+    @Override
+    public Record apply(InputT element, SinkWriter.Context context) {
+        return Record.builder()
+                
.data(SdkBytes.fromByteArray(serializationSchema.serialize(element)))
+                .build();
+    }
+
+    public static <InputT> Builder<InputT> builder() {
+        return new Builder<>();
+    }
+
+    /** A builder for the KinesisFirehoseSinkElementConverter. */
+    @PublicEvolving
+    public static class Builder<InputT> {
+
+        private SerializationSchema<InputT> serializationSchema;
+
+        public Builder<InputT> setSerializationSchema(
+                SerializationSchema<InputT> serializationSchema) {
+            this.serializationSchema = serializationSchema;
+            return this;
+        }
+
+        public KinesisFirehoseSinkElementConverter<InputT> build() {
+            Preconditions.checkNotNull(
+                    serializationSchema,
+                    "No SerializationSchema was supplied to the "
+                            + "KinesisFirehoseSinkElementConverter builder.");
+            return new 
KinesisFirehoseSinkElementConverter<>(serializationSchema);
+        }
+    }
+}
diff --git 
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
similarity index 54%
copy from 
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
copy to 
flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
index b720e5e..4002ec3 100644
--- 
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
+++ 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
@@ -15,27 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.kinesis.sink;
+package org.apache.flink.connector.firehose.sink;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil;
 import org.apache.flink.connector.aws.util.AWSGeneralUtil;
 import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
-import org.apache.flink.connector.kinesis.util.AWSKinesisDataStreamsUtil;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
-import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
-import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
-import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
-import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
-import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;
-import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.firehose.FirehoseAsyncClient;
+import software.amazon.awssdk.services.firehose.model.PutRecordBatchRequest;
+import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponse;
+import 
software.amazon.awssdk.services.firehose.model.PutRecordBatchResponseEntry;
+import software.amazon.awssdk.services.firehose.model.Record;
+import 
software.amazon.awssdk.services.firehose.model.ResourceNotFoundException;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
@@ -44,35 +46,36 @@ import java.util.concurrent.CompletionException;
 import java.util.function.Consumer;
 
 /**
- * Sink writer created by {@link KinesisDataStreamsSink} to write to Kinesis 
Data Streams. More
+ * Sink writer created by {@link KinesisFirehoseSink} to write to Kinesis Data 
Firehose. More
  * details on the operation of this sink writer may be found in the doc for 
{@link
- * KinesisDataStreamsSink}. More details on the internals of this sink writer 
may be found in {@link
+ * KinesisFirehoseSink}. More details on the internals of this sink writer may 
be found in {@link
  * AsyncSinkWriter}.
  *
- * <p>The {@link KinesisAsyncClient} used here may be configured in the 
standard way for the AWS SDK
- * 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} 
and {@code
+ * <p>The {@link FirehoseAsyncClient} used here may be configured in the 
standard way for the AWS
+ * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code 
AWS_ACCESS_KEY_ID} and {@code
  * AWS_SECRET_ACCESS_KEY} through environment variables etc.
  */
-class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, 
PutRecordsRequestEntry> {
-    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataStreamsSinkWriter.class);
+@Internal
+class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, 
Record> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisFirehoseSinkWriter.class);
 
     /* A counter for the total number of records that have encountered an 
error during put */
     private final Counter numRecordsOutErrorsCounter;
 
-    /* Name of the stream in Kinesis Data Streams */
-    private final String streamName;
+    /* Name of the delivery stream in Kinesis Data Firehose */
+    private final String deliveryStreamName;
 
     /* The sink writer metric group */
     private final SinkWriterMetricGroup metrics;
 
-    /* The asynchronous Kinesis client - construction is by 
kinesisClientProperties */
-    private final KinesisAsyncClient client;
+    /* The asynchronous Firehose client - construction is by 
firehoseClientProperties */
+    private final FirehoseAsyncClient client;
 
     /* Flag to whether fatally fail any time we encounter an exception when 
persisting records */
     private final boolean failOnError;
 
-    KinesisDataStreamsSinkWriter(
-            ElementConverter<InputT, PutRecordsRequestEntry> elementConverter,
+    KinesisFirehoseSinkWriter(
+            ElementConverter<InputT, Record> elementConverter,
             Sink.InitContext context,
             int maxBatchSize,
             int maxInFlightRequests,
@@ -81,8 +84,8 @@ class KinesisDataStreamsSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, PutRe
             long maxTimeInBufferMS,
             long maxRecordSizeInBytes,
             boolean failOnError,
-            String streamName,
-            Properties kinesisClientProperties) {
+            String deliveryStreamName,
+            Properties firehoseClientProperties) {
         super(
                 elementConverter,
                 context,
@@ -93,38 +96,43 @@ class KinesisDataStreamsSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, PutRe
                 maxTimeInBufferMS,
                 maxRecordSizeInBytes);
         this.failOnError = failOnError;
-        this.streamName = streamName;
+        this.deliveryStreamName = deliveryStreamName;
         this.metrics = context.metricGroup();
         this.numRecordsOutErrorsCounter = 
metrics.getNumRecordsOutErrorsCounter();
-        this.client = buildClient(kinesisClientProperties);
+        this.client = buildClient(firehoseClientProperties);
     }
 
-    private KinesisAsyncClient buildClient(Properties kinesisClientProperties) 
{
-
+    private FirehoseAsyncClient buildClient(Properties 
firehoseClientProperties) {
         final SdkAsyncHttpClient httpClient =
-                AWSGeneralUtil.createAsyncHttpClient(kinesisClientProperties);
-
-        return AWSKinesisDataStreamsUtil.createKinesisAsyncClient(
-                kinesisClientProperties, httpClient);
+                AWSGeneralUtil.createAsyncHttpClient(firehoseClientProperties);
+
+        return AWSAsyncSinkUtil.createAwsAsyncClient(
+                firehoseClientProperties,
+                httpClient,
+                FirehoseAsyncClient.builder(),
+                
KinesisFirehoseConfigConstants.BASE_FIREHOSE_USER_AGENT_PREFIX_FORMAT,
+                
KinesisFirehoseConfigConstants.FIREHOSE_CLIENT_USER_AGENT_PREFIX);
     }
 
     @Override
     protected void submitRequestEntries(
-            List<PutRecordsRequestEntry> requestEntries,
-            Consumer<List<PutRecordsRequestEntry>> requestResult) {
+            List<Record> requestEntries, Consumer<Collection<Record>> 
requestResult) {
 
-        PutRecordsRequest batchRequest =
-                
PutRecordsRequest.builder().records(requestEntries).streamName(streamName).build();
+        PutRecordBatchRequest batchRequest =
+                PutRecordBatchRequest.builder()
+                        .records(requestEntries)
+                        .deliveryStreamName(deliveryStreamName)
+                        .build();
 
-        LOG.trace("Request to submit {} entries to KDS using KDS Sink.", 
requestEntries.size());
+        LOG.trace("Request to submit {} entries to KDF using KDF Sink.", 
requestEntries.size());
 
-        CompletableFuture<PutRecordsResponse> future = 
client.putRecords(batchRequest);
+        CompletableFuture<PutRecordBatchResponse> future = 
client.putRecordBatch(batchRequest);
 
         future.whenComplete(
                 (response, err) -> {
                     if (err != null) {
                         handleFullyFailedRequest(err, requestEntries, 
requestResult);
-                    } else if (response.failedRecordCount() > 0) {
+                    } else if (response.failedPutCount() > 0) {
                         handlePartiallyFailedRequest(response, requestEntries, 
requestResult);
                     } else {
                         requestResult.accept(Collections.emptyList());
@@ -133,15 +141,19 @@ class KinesisDataStreamsSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, PutRe
     }
 
     @Override
-    protected long getSizeInBytes(PutRecordsRequestEntry requestEntry) {
+    protected long getSizeInBytes(Record requestEntry) {
         return requestEntry.data().asByteArrayUnsafe().length;
     }
 
     private void handleFullyFailedRequest(
             Throwable err,
-            List<PutRecordsRequestEntry> requestEntries,
-            Consumer<List<PutRecordsRequestEntry>> requestResult) {
-        LOG.warn("KDS Sink failed to persist {} entries to KDS", 
requestEntries.size(), err);
+            List<Record> requestEntries,
+            Consumer<Collection<Record>> requestResult) {
+        LOG.warn(
+                "KDF Sink failed to persist {} entries to KDF first request 
was {}",
+                requestEntries.size(),
+                requestEntries.get(0).toString(),
+                err);
         numRecordsOutErrorsCounter.inc(requestEntries.size());
 
         if (isRetryable(err)) {
@@ -150,20 +162,22 @@ class KinesisDataStreamsSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, PutRe
     }
 
     private void handlePartiallyFailedRequest(
-            PutRecordsResponse response,
-            List<PutRecordsRequestEntry> requestEntries,
-            Consumer<List<PutRecordsRequestEntry>> requestResult) {
-        LOG.warn("KDS Sink failed to persist {} entries to KDS", 
response.failedRecordCount());
-        numRecordsOutErrorsCounter.inc(response.failedRecordCount());
+            PutRecordBatchResponse response,
+            List<Record> requestEntries,
+            Consumer<Collection<Record>> requestResult) {
+        LOG.warn(
+                "KDF Sink failed to persist {} entries to KDF first request 
was {}",
+                requestEntries.size(),
+                requestEntries.get(0).toString());
+        numRecordsOutErrorsCounter.inc(response.failedPutCount());
 
         if (failOnError) {
             getFatalExceptionCons()
-                    .accept(new 
KinesisDataStreamsException.KinesisDataStreamsFailFastException());
+                    .accept(new 
KinesisFirehoseException.KinesisFirehoseFailFastException());
             return;
         }
-        List<PutRecordsRequestEntry> failedRequestEntries =
-                new ArrayList<>(response.failedRecordCount());
-        List<PutRecordsResultEntry> records = response.records();
+        List<Record> failedRequestEntries = new 
ArrayList<>(response.failedPutCount());
+        List<PutRecordBatchResponseEntry> records = 
response.requestResponses();
 
         for (int i = 0; i < records.size(); i++) {
             if (records.get(i).errorCode() != null) {
@@ -179,15 +193,13 @@ class KinesisDataStreamsSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, PutRe
                 && err.getCause() instanceof ResourceNotFoundException) {
             getFatalExceptionCons()
                     .accept(
-                            new KinesisDataStreamsException(
+                            new KinesisFirehoseException(
                                     "Encountered non-recoverable exception", 
err));
             return false;
         }
         if (failOnError) {
             getFatalExceptionCons()
-                    .accept(
-                            new 
KinesisDataStreamsException.KinesisDataStreamsFailFastException(
-                                    err));
+                    .accept(new 
KinesisFirehoseException.KinesisFirehoseFailFastException(err));
             return false;
         }
 
diff --git 
a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/resources/log4j2.properties
 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/resources/log4j2.properties
new file mode 100644
index 0000000..c64a340
--- /dev/null
+++ 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/resources/log4j2.properties
@@ -0,0 +1,25 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger.level = OFF
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git 
a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java
 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java
new file mode 100644
index 0000000..0dc85da
--- /dev/null
+++ 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import software.amazon.awssdk.services.firehose.model.Record;
+
+/** Covers construction, defaults and sanity checking of {@link 
KinesisFirehoseSinkBuilder}. */
+public class KinesisFirehoseSinkBuilderTest {
+    private static final ElementConverter<String, Record> 
ELEMENT_CONVERTER_PLACEHOLDER =
+            KinesisFirehoseSinkElementConverter.<String>builder()
+                    .setSerializationSchema(new SimpleStringSchema())
+                    .build();
+
+    @Test
+    public void elementConverterOfSinkMustBeSetWhenBuilt() {
+        Assertions.assertThatExceptionOfType(NullPointerException.class)
+                .isThrownBy(
+                        () ->
+                                KinesisFirehoseSink.builder()
+                                        
.setDeliveryStreamName("deliveryStream")
+                                        .build())
+                .withMessageContaining(
+                        "ElementConverter must be not null when initializing 
the AsyncSinkBase.");
+    }
+
+    @Test
+    public void streamNameOfSinkMustBeSetWhenBuilt() {
+        Assertions.assertThatExceptionOfType(NullPointerException.class)
+                .isThrownBy(
+                        () ->
+                                KinesisFirehoseSink.<String>builder()
+                                        
.setElementConverter(ELEMENT_CONVERTER_PLACEHOLDER)
+                                        .build())
+                .withMessageContaining(
+                        "The delivery stream name must not be null when 
initializing the KDF Sink.");
+    }
+
+    @Test
+    public void streamNameOfSinkMustBeSetToNonEmptyWhenBuilt() {
+        Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+                .isThrownBy(
+                        () ->
+                                KinesisFirehoseSink.<String>builder()
+                                        .setDeliveryStreamName("")
+                                        
.setElementConverter(ELEMENT_CONVERTER_PLACEHOLDER)
+                                        .build())
+                .withMessageContaining(
+                        "The delivery stream name must be set when 
initializing the KDF Sink.");
+    }
+}
diff --git 
a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java
 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java
new file mode 100644
index 0000000..ed0b1c7
--- /dev/null
+++ 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Covers construction and sanity checking of {@link 
KinesisFirehoseSinkElementConverter}. */
+public class KinesisFirehoseSinkElementConverterTest {
+
+    @Test
+    public void 
elementConverterWillComplainASerializationSchemaIsNotSetIfBuildIsCalledWithoutIt()
 {
+        Assertions.assertThatExceptionOfType(NullPointerException.class)
+                .isThrownBy(() -> 
KinesisFirehoseSinkElementConverter.<String>builder().build())
+                .withMessageContaining(
+                        "No SerializationSchema was supplied to the 
KinesisFirehoseSinkElementConverter builder.");
+    }
+
+    @Test
+    public void elementConverterUsesProvidedSchemaToSerializeRecord() {
+        ElementConverter<String, Record> elementConverter =
+                KinesisFirehoseSinkElementConverter.<String>builder()
+                        .setSerializationSchema(new SimpleStringSchema())
+                        .build();
+
+        String testString = "{many hands make light work;";
+
+        Record serializedRecord = elementConverter.apply(testString, null);
+        byte[] serializedString = (new 
SimpleStringSchema()).serialize(testString);
+        
assertThat(serializedRecord.data()).isEqualTo(SdkBytes.fromByteArray(serializedString));
+    }
+}
diff --git 
a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java
 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java
new file mode 100644
index 0000000..08cb49b
--- /dev/null
+++ 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.aws.testutils.LocalstackContainer;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.DockerImageVersions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.core.SdkSystemSetting;
+import software.amazon.awssdk.services.firehose.FirehoseAsyncClient;
+import software.amazon.awssdk.services.firehose.model.Record;
+import software.amazon.awssdk.services.iam.IamAsyncClient;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.model.S3Object;
+import software.amazon.awssdk.utils.ImmutableMap;
+
+import java.util.List;
+
+import static 
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createBucket;
+import static 
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createIAMRole;
+import static 
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.getConfig;
+import static 
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.getIamClient;
+import static 
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.getS3Client;
+import static 
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.listBucketObjects;
+import static 
org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.createDeliveryStream;
+import static 
org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.getFirehoseClient;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Integration test suite for the {@code KinesisFirehoseSink} using a 
localstack container. */
+public class KinesisFirehoseSinkITCase {
+
+    private static final ElementConverter<String, Record> elementConverter =
+            KinesisFirehoseSinkElementConverter.<String>builder()
+                    .setSerializationSchema(new SimpleStringSchema())
+                    .build();
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisFirehoseSinkITCase.class);
+    private S3AsyncClient s3AsyncClient;
+    private FirehoseAsyncClient firehoseAsyncClient;
+    private IamAsyncClient iamAsyncClient;
+
+    private static final String ROLE_NAME = "super-role";
+    private static final String ROLE_ARN = "arn:aws:iam::000000000000:role/" + 
ROLE_NAME;
+    private static final String BUCKET_NAME = "s3-firehose";
+    private static final String STREAM_NAME = "s3-stream";
+    private static final int NUMBER_OF_ELEMENTS = 92;
+
+    @ClassRule
+    public static LocalstackContainer mockFirehoseContainer =
+            new 
LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK));
+
+    @Before
+    public void setup() throws Exception {
+        System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
+        s3AsyncClient = getS3Client(mockFirehoseContainer.getEndpoint());
+        firehoseAsyncClient = 
getFirehoseClient(mockFirehoseContainer.getEndpoint());
+        iamAsyncClient = getIamClient(mockFirehoseContainer.getEndpoint());
+    }
+
+    @After
+    public void teardown() {
+        System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property());
+    }
+
+    @Test
+    public void test() throws Exception {
+        LOG.info("1 - Creating the bucket for Firehose to deliver into...");
+        createBucket(s3AsyncClient, BUCKET_NAME);
+        LOG.info("2 - Creating the IAM Role for Firehose to write into the s3 
bucket...");
+        createIAMRole(iamAsyncClient, ROLE_NAME);
+        LOG.info("3 - Creating the Firehose delivery stream...");
+        createDeliveryStream(STREAM_NAME, BUCKET_NAME, ROLE_ARN, 
firehoseAsyncClient);
+
+        ObjectMapper mapper = new ObjectMapper();
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+        DataStream<String> generator =
+                env.fromSequence(1, NUMBER_OF_ELEMENTS)
+                        .map(Object::toString)
+                        .returns(String.class)
+                        .map(data -> 
mapper.writeValueAsString(ImmutableMap.of("data", data)));
+
+        KinesisFirehoseSink<String> kdsSink =
+                KinesisFirehoseSink.<String>builder()
+                        .setElementConverter(elementConverter)
+                        .setDeliveryStreamName(STREAM_NAME)
+                        .setMaxBatchSize(1)
+                        
.setFirehoseClientProperties(getConfig(mockFirehoseContainer.getEndpoint()))
+                        .build();
+
+        generator.sinkTo(kdsSink);
+        env.execute("Integration Test");
+
+        List<S3Object> objects = listBucketObjects(s3AsyncClient, BUCKET_NAME);
+        assertThat(objects.size()).isEqualTo(NUMBER_OF_ELEMENTS);
+    }
+}
diff --git 
a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkTest.java
 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkTest.java
new file mode 100644
index 0000000..164ec39
--- /dev/null
+++ 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import java.util.Properties;
+
+/** Covers construction, defaults and sanity checking of {@link 
KinesisFirehoseSink}. */
+public class KinesisFirehoseSinkTest {
+
+    private static final ElementConverter<String, Record> elementConverter =
+            KinesisFirehoseSinkElementConverter.<String>builder()
+                    .setSerializationSchema(new SimpleStringSchema())
+                    .build();
+
+    @Test
+    public void deliveryStreamNameMustNotBeNull() {
+        Assertions.assertThatExceptionOfType(NullPointerException.class)
+                .isThrownBy(
+                        () ->
+                                new KinesisFirehoseSink<>(
+                                        elementConverter,
+                                        500,
+                                        16,
+                                        10000,
+                                        4 * 1024 * 1024L,
+                                        5000L,
+                                        1000 * 1024L,
+                                        false,
+                                        null,
+                                        new Properties()))
+                .withMessageContaining(
+                        "The delivery stream name must not be null when 
initializing the KDF Sink.");
+    }
+
+    @Test
+    public void deliveryStreamNameMustNotBeEmpty() {
+        Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+                .isThrownBy(
+                        () ->
+                                new KinesisFirehoseSink<>(
+                                        elementConverter,
+                                        500,
+                                        16,
+                                        10000,
+                                        4 * 1024 * 1024L,
+                                        5000L,
+                                        1000 * 1024L,
+                                        false,
+                                        "",
+                                        new Properties()))
+                .withMessageContaining(
+                        "The delivery stream name must be set when 
initializing the KDF Sink.");
+    }
+}
diff --git 
a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
new file mode 100644
index 0000000..d840033
--- /dev/null
+++ 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
+
+import org.junit.Before;
+import org.junit.Test;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Properties;
+
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Covers construction, defaults and sanity checking of {@link 
KinesisFirehoseSinkWriter}. */
+public class KinesisFirehoseSinkWriterTest {
+
+    private KinesisFirehoseSinkWriter<String> sinkWriter;
+
+    private static final ElementConverter<String, Record> 
ELEMENT_CONVERTER_PLACEHOLDER =
+            KinesisFirehoseSinkElementConverter.<String>builder()
+                    .setSerializationSchema(new SimpleStringSchema())
+                    .build();
+
+    @Before
+    public void setup() {
+        TestSinkInitContext sinkInitContext = new TestSinkInitContext();
+        Properties sinkProperties = new Properties();
+        sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+        sinkWriter =
+                new KinesisFirehoseSinkWriter<>(
+                        ELEMENT_CONVERTER_PLACEHOLDER,
+                        sinkInitContext,
+                        50,
+                        16,
+                        10000,
+                        4 * 1024 * 1024,
+                        5000,
+                        1000 * 1024,
+                        true,
+                        "streamName",
+                        sinkProperties);
+    }
+
+    @Test
+    public void getSizeInBytesReturnsSizeOfBlobBeforeBase64Encoding() {
+        String testString = "{many hands make light work;";
+        Record record = 
Record.builder().data(SdkBytes.fromUtf8String(testString)).build();
+        assertThat(sinkWriter.getSizeInBytes(record))
+                
.isEqualTo(testString.getBytes(StandardCharsets.US_ASCII).length);
+    }
+
+    @Test
+    public void getNumRecordsOutErrorsCounterRecordsCorrectNumberOfFailures()
+            throws IOException, InterruptedException {
+        Properties prop = new Properties();
+        prop.setProperty(AWSConfigConstants.AWS_REGION, 
Region.EU_WEST_1.toString());
+        prop.setProperty(AWS_ENDPOINT, "https://fake_aws_endpoint";);
+        TestSinkInitContext ctx = new TestSinkInitContext();
+        KinesisFirehoseSink<String> kinesisFirehoseSink =
+                new KinesisFirehoseSink<>(
+                        ELEMENT_CONVERTER_PLACEHOLDER,
+                        6,
+                        16,
+                        10000,
+                        4 * 1024 * 1024L,
+                        5000L,
+                        1000 * 1024L,
+                        true,
+                        "test-stream",
+                        prop);
+        SinkWriter<String, Void, Collection<Record>> writer =
+                kinesisFirehoseSink.createWriter(ctx, new ArrayList<>());
+
+        for (int i = 0; i < 12; i++) {
+            writer.write("data_bytes", null);
+        }
+
+        
assertThat(ctx.metricGroup().getNumRecordsOutErrorsCounter().getCount()).isEqualTo(12);
+    }
+}
diff --git 
a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/examples/SinkIntoFirehose.java
 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/examples/SinkIntoFirehose.java
new file mode 100644
index 0000000..3e9d0ee
--- /dev/null
+++ 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/examples/SinkIntoFirehose.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink.examples;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.firehose.sink.KinesisFirehoseSink;
+import 
org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkElementConverter;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import software.amazon.awssdk.services.firehose.FirehoseAsyncClient;
+import software.amazon.awssdk.utils.ImmutableMap;
+
+import java.util.Properties;
+
+/**
+ * An example application demonstrating how to use the {@link 
KinesisFirehoseSink} to sink into KDF.
+ *
+ * <p>The {@link FirehoseAsyncClient} used here may be configured in the 
standard way for the AWS
+ * SDK 2.x. e.g. the provision of {@code AWS_ACCESS_KEY_ID} and {@code 
AWS_SECRET_ACCESS_KEY}
+ * through environment variables etc.
+ */
+public class SinkIntoFirehose {
+
+    private static final KinesisFirehoseSinkElementConverter<String> 
elementConverter =
+            KinesisFirehoseSinkElementConverter.<String>builder()
+                    .setSerializationSchema(new SimpleStringSchema())
+                    .build();
+
+    public static void main(String[] args) throws Exception {
+        ObjectMapper mapper = new ObjectMapper();
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10_000);
+
+        DataStream<String> generator =
+                env.fromSequence(1, 10_000_000L)
+                        .map(Object::toString)
+                        .returns(String.class)
+                        .map(data -> 
mapper.writeValueAsString(ImmutableMap.of("data", data)));
+
+        Properties sinkProperties = new Properties();
+        sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+
+        KinesisFirehoseSink<String> kdfSink =
+                KinesisFirehoseSink.<String>builder()
+                        .setElementConverter(elementConverter)
+                        .setDeliveryStreamName("delivery-stream")
+                        .setMaxBatchSize(20)
+                        .setFirehoseClientProperties(sinkProperties)
+                        .build();
+
+        generator.sinkTo(kdfSink);
+
+        env.execute("KDF Async Sink Example Program");
+    }
+}
diff --git 
a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java
 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java
new file mode 100644
index 0000000..1de389d
--- /dev/null
+++ 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink.testutils;
+
+import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil;
+import org.apache.flink.connector.firehose.sink.KinesisFirehoseConfigConstants;
+
+import software.amazon.awssdk.services.firehose.FirehoseAsyncClient;
+import 
software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamRequest;
+import 
software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamResponse;
+import software.amazon.awssdk.services.firehose.model.DeliveryStreamType;
+import 
software.amazon.awssdk.services.firehose.model.ExtendedS3DestinationConfiguration;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static 
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.getConfig;
+import static 
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.getHttpClient;
+
+/**
+ * A set of static methods that can be used to call common AWS services on the 
Localstack container.
+ */
+public class KinesisFirehoseTestUtils {
+
+    public static FirehoseAsyncClient getFirehoseClient(String endpoint) 
throws URISyntaxException {
+        return AWSAsyncSinkUtil.createAwsAsyncClient(
+                getConfig(endpoint),
+                getHttpClient(endpoint),
+                FirehoseAsyncClient.builder().endpointOverride(new 
URI(endpoint)),
+                
KinesisFirehoseConfigConstants.BASE_FIREHOSE_USER_AGENT_PREFIX_FORMAT,
+                
KinesisFirehoseConfigConstants.FIREHOSE_CLIENT_USER_AGENT_PREFIX);
+    }
+
+    public static void createDeliveryStream(
+            String deliveryStreamName,
+            String bucketName,
+            String roleARN,
+            FirehoseAsyncClient firehoseAsyncClient)
+            throws ExecutionException, InterruptedException {
+        ExtendedS3DestinationConfiguration s3Config =
+                ExtendedS3DestinationConfiguration.builder()
+                        .bucketARN(bucketName)
+                        .roleARN(roleARN)
+                        .build();
+        CreateDeliveryStreamRequest request =
+                CreateDeliveryStreamRequest.builder()
+                        .deliveryStreamName(deliveryStreamName)
+                        .extendedS3DestinationConfiguration(s3Config)
+                        .deliveryStreamType(DeliveryStreamType.DIRECT_PUT)
+                        .build();
+
+        CompletableFuture<CreateDeliveryStreamResponse> deliveryStream =
+                firehoseAsyncClient.createDeliveryStream(request);
+        deliveryStream.get();
+    }
+}
diff --git 
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/log4j2-test.properties
 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/resources/log4j2-test.properties
similarity index 97%
copy from 
flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/log4j2-test.properties
copy to 
flink-connectors/flink-connector-aws-kinesis-firehose/src/test/resources/log4j2-test.properties
index c5339e7..c4fa187 100644
--- 
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/log4j2-test.properties
+++ 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/resources/log4j2-test.properties
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-rootLogger.level = INFO
+rootLogger.level = OFF
 rootLogger.appenderRef.test.ref = TestLogger
 
 appender.testlogger.name = TestLogger
diff --git a/flink-connectors/flink-connector-base/pom.xml 
b/flink-connectors/flink-connector-base/pom.xml
index 751aecd..58fd369 100644
--- a/flink-connectors/flink-connector-base/pom.xml
+++ b/flink-connectors/flink-connector-base/pom.xml
@@ -72,4 +72,20 @@
                        <scope>test</scope>
                </dependency>
        </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-jar-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>test-jar</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
 </project>
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
index b508159..00a08b1 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
@@ -17,22 +17,8 @@
 
 package org.apache.flink.connector.base.sink.writer;
 
-import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
-import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
-import org.apache.flink.metrics.testutils.MetricListener;
-import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
-import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
-import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
-import org.apache.flink.util.UserCodeClassLoader;
-import org.apache.flink.util.function.RunnableWithException;
-import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -42,10 +28,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Optional;
-import java.util.OptionalLong;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -66,12 +49,12 @@ import static org.junit.jupiter.api.Assertions.fail;
 public class AsyncSinkWriterTest {
 
     private final List<Integer> res = new ArrayList<>();
-    private SinkInitContext sinkInitContext;
+    private TestSinkInitContext sinkInitContext;
 
     @Before
     public void before() {
         res.clear();
-        sinkInitContext = new SinkInitContext();
+        sinkInitContext = new TestSinkInitContext();
     }
 
     private void performNormalWriteOfEightyRecordsToMock()
@@ -1008,105 +991,6 @@ public class AsyncSinkWriterTest {
         }
     }
 
-    private static class SinkInitContext implements Sink.InitContext {
-
-        private static final TestProcessingTimeService processingTimeService;
-        private final MetricListener metricListener = new MetricListener();
-        private final OperatorIOMetricGroup operatorIOMetricGroup =
-                
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
-        private final SinkWriterMetricGroup metricGroup =
-                InternalSinkWriterMetricGroup.mock(
-                        metricListener.getMetricGroup(), 
operatorIOMetricGroup);
-
-        static {
-            processingTimeService = new TestProcessingTimeService();
-        }
-
-        @Override
-        public UserCodeClassLoader getUserCodeClassLoader() {
-            return null;
-        }
-
-        @Override
-        public MailboxExecutor getMailboxExecutor() {
-            StreamTaskActionExecutor streamTaskActionExecutor =
-                    new StreamTaskActionExecutor() {
-                        @Override
-                        public void run(RunnableWithException e) throws 
Exception {
-                            e.run();
-                        }
-
-                        @Override
-                        public <E extends Throwable> void runThrowing(
-                                ThrowingRunnable<E> throwingRunnable) throws E 
{
-                            throwingRunnable.run();
-                        }
-
-                        @Override
-                        public <R> R call(Callable<R> callable) throws 
Exception {
-                            return callable.call();
-                        }
-                    };
-            return new MailboxExecutorImpl(
-                    new TaskMailboxImpl(Thread.currentThread()),
-                    Integer.MAX_VALUE,
-                    streamTaskActionExecutor);
-        }
-
-        @Override
-        public Sink.ProcessingTimeService getProcessingTimeService() {
-            return new Sink.ProcessingTimeService() {
-                @Override
-                public long getCurrentProcessingTime() {
-                    return processingTimeService.getCurrentProcessingTime();
-                }
-
-                @Override
-                public void registerProcessingTimer(
-                        long time, ProcessingTimeCallback 
processingTimerCallback) {
-                    processingTimeService.registerTimer(
-                            time, processingTimerCallback::onProcessingTime);
-                }
-            };
-        }
-
-        @Override
-        public int getSubtaskId() {
-            return 0;
-        }
-
-        @Override
-        public int getNumberOfParallelSubtasks() {
-            return 0;
-        }
-
-        @Override
-        public SinkWriterMetricGroup metricGroup() {
-            return metricGroup;
-        }
-
-        @Override
-        public OptionalLong getRestoredCheckpointId() {
-            return OptionalLong.empty();
-        }
-
-        public TestProcessingTimeService getTestProcessingTimeService() {
-            return processingTimeService;
-        }
-
-        private Optional<Gauge<Long>> getCurrentSendTimeGauge() {
-            return metricListener.getGauge("currentSendTime");
-        }
-
-        private Counter getNumRecordsOutCounter() {
-            return metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
-        }
-
-        private Counter getNumBytesOutCounter() {
-            return metricGroup.getIOMetricGroup().getNumBytesOutCounter();
-        }
-    }
-
     /**
      * This SinkWriter releases the lock on existing threads blocked by {@code 
delayedStartLatch}
      * and blocks itself until {@code blockedThreadLatch} is unblocked.
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
new file mode 100644
index 0000000..fba2711
--- /dev/null
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.metrics.testutils.MetricListener;
+import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
+import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
+import org.apache.flink.util.UserCodeClassLoader;
+import org.apache.flink.util.function.RunnableWithException;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.Callable;
+
+/** A mock implementation of a {@code Sink.InitContext} to be used in sink 
unit tests. */
+public class TestSinkInitContext implements Sink.InitContext {
+
+    private static final TestProcessingTimeService processingTimeService;
+    private final MetricListener metricListener = new MetricListener();
+    private final OperatorIOMetricGroup operatorIOMetricGroup =
+            
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
+    private final SinkWriterMetricGroup metricGroup =
+            InternalSinkWriterMetricGroup.mock(
+                    metricListener.getMetricGroup(), operatorIOMetricGroup);
+
+    static {
+        processingTimeService = new TestProcessingTimeService();
+    }
+
+    @Override
+    public UserCodeClassLoader getUserCodeClassLoader() {
+        return null;
+    }
+
+    @Override
+    public MailboxExecutor getMailboxExecutor() {
+        StreamTaskActionExecutor streamTaskActionExecutor =
+                new StreamTaskActionExecutor() {
+                    @Override
+                    public void run(RunnableWithException e) throws Exception {
+                        e.run();
+                    }
+
+                    @Override
+                    public <E extends Throwable> void runThrowing(
+                            ThrowingRunnable<E> throwingRunnable) throws E {
+                        throwingRunnable.run();
+                    }
+
+                    @Override
+                    public <R> R call(Callable<R> callable) throws Exception {
+                        return callable.call();
+                    }
+                };
+        return new MailboxExecutorImpl(
+                new TaskMailboxImpl(Thread.currentThread()),
+                Integer.MAX_VALUE,
+                streamTaskActionExecutor);
+    }
+
+    @Override
+    public Sink.ProcessingTimeService getProcessingTimeService() {
+        return new Sink.ProcessingTimeService() {
+            @Override
+            public long getCurrentProcessingTime() {
+                return processingTimeService.getCurrentProcessingTime();
+            }
+
+            @Override
+            public void registerProcessingTimer(
+                    long time, ProcessingTimeCallback processingTimerCallback) 
{
+                processingTimeService.registerTimer(
+                        time, processingTimerCallback::onProcessingTime);
+            }
+        };
+    }
+
+    @Override
+    public int getSubtaskId() {
+        return 0;
+    }
+
+    @Override
+    public int getNumberOfParallelSubtasks() {
+        return 0;
+    }
+
+    @Override
+    public SinkWriterMetricGroup metricGroup() {
+        return metricGroup;
+    }
+
+    @Override
+    public OptionalLong getRestoredCheckpointId() {
+        return OptionalLong.empty();
+    }
+
+    public TestProcessingTimeService getTestProcessingTimeService() {
+        return processingTimeService;
+    }
+
+    public Optional<Gauge<Long>> getCurrentSendTimeGauge() {
+        return metricListener.getGauge("currentSendTime");
+    }
+
+    public Counter getNumRecordsOutCounter() {
+        return metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+    }
+
+    public Counter getNumBytesOutCounter() {
+        return metricGroup.getIOMetricGroup().getNumBytesOutCounter();
+    }
+}
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
index e2e6a79..0677765 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
@@ -18,8 +18,8 @@
 package org.apache.flink.streaming.connectors.kinesis.proxy;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.kinesis.util.AWSKinesisDataStreamsUtil;
 import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisherConfiguration;
+import org.apache.flink.streaming.connectors.kinesis.util.AwsV2Util;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
@@ -190,7 +190,7 @@ public class KinesisProxyV2 implements 
KinesisProxyV2Interface {
             try {
                 response = responseSupplier.get();
             } catch (Exception ex) {
-                if (AWSKinesisDataStreamsUtil.isRecoverableException(ex)) {
+                if (AwsV2Util.isRecoverableException(ex)) {
                     long backoffMillis =
                             backoff.calculateFullJitterBackoff(
                                     jitterBase, jitterMax, jitterExponent, 
++attempt);
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java
index 03767cd..62f25db 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java
@@ -18,9 +18,9 @@
 package org.apache.flink.streaming.connectors.kinesis.proxy;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil;
 import org.apache.flink.connector.aws.util.AWSGeneralUtil;
-import 
org.apache.flink.connector.kinesis.config.AWSKinesisDataStreamsConfigConstants;
-import org.apache.flink.connector.kinesis.util.AWSKinesisDataStreamsUtil;
+import 
org.apache.flink.connector.kinesis.sink.KinesisDataStreamsConfigConstants;
 import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisherConfiguration;
 import org.apache.flink.streaming.connectors.kinesis.util.AwsV2Util;
 import org.apache.flink.util.Preconditions;
@@ -63,13 +63,17 @@ public class KinesisProxyV2Factory {
 
         Properties legacyConfigProps = new Properties(configProps);
         legacyConfigProps.setProperty(
-                
AWSKinesisDataStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX,
-                AWSKinesisDataStreamsUtil.formatFlinkUserAgentPrefix(
-                        AWSKinesisDataStreamsConfigConstants
-                                .BASE_KINESIS_USER_AGENT_PREFIX_FORMAT));
+                
KinesisDataStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX,
+                AWSAsyncSinkUtil.formatFlinkUserAgentPrefix(
+                        
KinesisDataStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT));
 
         final KinesisAsyncClient client =
-                
AWSKinesisDataStreamsUtil.createKinesisAsyncClient(legacyConfigProps, 
httpClient);
+                AWSAsyncSinkUtil.createAwsAsyncClient(
+                        legacyConfigProps,
+                        httpClient,
+                        KinesisAsyncClient.builder(),
+                        
KinesisDataStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT,
+                        
KinesisDataStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX);
 
         return new KinesisProxyV2(client, httpClient, configuration, BACKOFF);
     }
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
index 5c949fe..684234c 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.connectors.kinesis.util;
 
 import org.apache.flink.annotation.Internal;
 import 
org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider;
-import 
org.apache.flink.connector.kinesis.config.AWSKinesisDataStreamsConfigConstants;
-import org.apache.flink.connector.kinesis.util.AWSKinesisDataStreamsUtil;
+import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil;
+import 
org.apache.flink.connector.kinesis.sink.KinesisDataStreamsConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
 import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
@@ -83,9 +83,8 @@ public class AWSUtil {
             Properties configProps, ClientConfiguration awsClientConfig) {
         // set a Flink-specific user agent
         awsClientConfig.setUserAgentPrefix(
-                AWSKinesisDataStreamsUtil.formatFlinkUserAgentPrefix(
-                        AWSKinesisDataStreamsConfigConstants
-                                .BASE_KINESIS_USER_AGENT_PREFIX_FORMAT));
+                AWSAsyncSinkUtil.formatFlinkUserAgentPrefix(
+                        
KinesisDataStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT));
 
         // utilize automatic refreshment of credentials by directly passing the
         // AWSCredentialsProvider
@@ -133,7 +132,7 @@ public class AWSUtil {
     private static AWSCredentialsProvider getCredentialsProvider(
             final Properties configProps, final String configPrefix) {
         CredentialProvider credentialProviderType =
-                
AWSKinesisDataStreamsUtil.getCredentialProviderType(configProps, configPrefix);
+                AWSAsyncSinkUtil.getCredentialProviderType(configProps, 
configPrefix);
 
         switch (credentialProviderType) {
             case ENV_VAR:
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
index 609e879..aa62a65 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.connectors.kinesis.util;
 import org.apache.flink.annotation.Internal;
 
 import software.amazon.awssdk.http.SdkHttpConfigurationOption;
+import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
+import 
software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
 import software.amazon.awssdk.utils.AttributeMap;
 
 import java.time.Duration;
@@ -70,4 +72,10 @@ public class AwsV2Util {
     public static boolean isNoneEfoRegistrationType(final Properties 
properties) {
         return NONE.name().equals(properties.get(EFO_REGISTRATION_TYPE));
     }
+
+    public static boolean isRecoverableException(Exception e) {
+        Throwable cause = e.getCause();
+        return cause instanceof LimitExceededException
+                || cause instanceof ProvisionedThroughputExceededException;
+    }
 }
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
index 47cf23f..ea9f03d 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
@@ -19,10 +19,12 @@ package org.apache.flink.streaming.connectors.kinesis.util;
 
 import org.junit.Test;
 import software.amazon.awssdk.http.SdkHttpConfigurationOption;
+import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
 import software.amazon.awssdk.utils.AttributeMap;
 
 import java.time.Duration;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
 
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType.EAGER;
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType.LAZY;
@@ -131,4 +133,36 @@ public class AwsV2UtilTest {
         prop.setProperty(EFO_REGISTRATION_TYPE, NONE.name());
         assertTrue(AwsV2Util.isNoneEfoRegistrationType(prop));
     }
+
+    @Test
+    public void testIsRecoverableExceptionForRecoverable() {
+        Exception recoverable = LimitExceededException.builder().build();
+        assertTrue(AwsV2Util.isRecoverableException(new 
ExecutionException(recoverable)));
+    }
+
+    @Test
+    public void testIsRecoverableExceptionForNonRecoverable() {
+        Exception nonRecoverable = new IllegalArgumentException("abc");
+        assertFalse(AwsV2Util.isRecoverableException(new 
ExecutionException(nonRecoverable)));
+    }
+
+    @Test
+    public void 
testIsRecoverableExceptionForRuntimeExceptionWrappingRecoverable() {
+        Exception recoverable = LimitExceededException.builder().build();
+        Exception runtime = new RuntimeException("abc", recoverable);
+        assertTrue(AwsV2Util.isRecoverableException(runtime));
+    }
+
+    @Test
+    public void 
testIsRecoverableExceptionForRuntimeExceptionWrappingNonRecoverable() {
+        Exception nonRecoverable = new IllegalArgumentException("abc");
+        Exception runtime = new RuntimeException("abc", nonRecoverable);
+        assertFalse(AwsV2Util.isRecoverableException(runtime));
+    }
+
+    @Test
+    public void testIsRecoverableExceptionForNullCause() {
+        Exception nonRecoverable = new IllegalArgumentException("abc");
+        assertFalse(AwsV2Util.isRecoverableException(nonRecoverable));
+    }
 }
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index 7495412..a4c97b7 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -55,6 +55,7 @@ under the License.
                <module>flink-connector-aws-base</module>
                <module>flink-connector-kinesis</module>
                <module>flink-connector-aws-kinesis-data-streams</module>
+               <module>flink-connector-aws-kinesis-firehose</module>
                <module>flink-connector-base</module>
                <module>flink-file-sink-common</module>
                <module>flink-connector-files</module>
diff --git 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
index afc6d9b..99c5293 100644
--- 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
+++ 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
@@ -37,6 +37,8 @@ public class DockerImageVersions {
 
     public static final String KINESALITE = "instructure/kinesalite:latest";
 
+    public static final String LOCALSTACK = "localstack/localstack:latest";
+
     public static final String PULSAR = "apachepulsar/pulsar:2.8.0";
 
     public static final String CASSANDRA_3 = "cassandra:3.0";
diff --git a/tools/ci/stage.sh b/tools/ci/stage.sh
index 7793c0b..d70c9ff 100755
--- a/tools/ci/stage.sh
+++ b/tools/ci/stage.sh
@@ -109,6 +109,7 @@ flink-connectors/flink-connector-rabbitmq,\
 flink-connectors/flink-connector-twitter,\
 flink-connectors/flink-connector-kinesis,\
 flink-connectors/flink-connector-aws-kinesis-data-streams,\
+flink-connectors/flink-connector-aws-kinesis-firehose,\
 flink-metrics/flink-metrics-dropwizard,\
 flink-metrics/flink-metrics-graphite,\
 flink-metrics/flink-metrics-jmx,\

Reply via email to