This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 6611bea39ca116c328ea4242a46aae104a7409a3
Author: Martin Zink <[email protected]>
AuthorDate: Tue Feb 4 12:52:22 2025 +0100

    MINIFICPP-2518 PutKinesisStream
    
    Signed-off-by: Ferenc Gerlits <[email protected]>
    Closes #1927
---
 PROCESSORS.md                                      |  47 +++
 README.md                                          |   2 +-
 cmake/BundledAwsSdkCpp.cmake                       |  14 +-
 docker/test/integration/cluster/ContainerStore.py  |   9 +
 .../test/integration/cluster/DockerTestCluster.py  |   4 +
 docker/test/integration/cluster/ImageStore.py      |   5 +
 .../integration/cluster/checkers/AwsChecker.py     |   5 +
 .../cluster/containers/KinesisServerContainer.py   |  41 +++
 .../features/MiNiFi_integration_test_driver.py     |   3 +
 docker/test/integration/features/kinesis.feature   |  38 +++
 docker/test/integration/features/steps/steps.py    |  11 +
 .../minifi/processors/PutKinesisStream.py          |  42 +++
 .../integration/resources/kinesis-mock/Dockerfile  |   8 +
 .../resources/kinesis-mock/consumer/consumer.js    |  64 +++++
 .../resources/kinesis-mock/consumer/package.json   |   3 +
 .../integration/resources/kinesis-mock/server.json |   6 +
 extensions/aws/AWSCredentialsProvider.cpp          |   2 -
 extensions/aws/AWSCredentialsProvider.h            |   2 -
 extensions/aws/CMakeLists.txt                      |  10 +-
 .../{S3Processor.cpp => AwsProcessor.cpp}          |  49 +---
 .../processors/{S3Processor.h => AwsProcessor.h}   |  35 +--
 extensions/aws/processors/DeleteS3Object.cpp       |  21 +-
 extensions/aws/processors/DeleteS3Object.h         |   5 +-
 extensions/aws/processors/FetchS3Object.cpp        |  19 +-
 extensions/aws/processors/FetchS3Object.h          |   5 +-
 extensions/aws/processors/ListS3.cpp               |   6 +-
 extensions/aws/processors/PutKinesisStream.cpp     | 208 ++++++++++++++
 extensions/aws/processors/PutKinesisStream.h       | 139 +++++++++
 extensions/aws/processors/PutS3Object.cpp          |  35 ++-
 extensions/aws/processors/PutS3Object.h            |  11 +-
 extensions/aws/processors/S3Processor.cpp          | 113 +-------
 extensions/aws/processors/S3Processor.h            | 155 +---------
 extensions/aws/s3/S3ClientRequestSender.cpp        |   2 -
 extensions/aws/s3/S3ClientRequestSender.h          |   2 -
 extensions/aws/s3/S3RequestSender.h                |   9 -
 extensions/aws/s3/S3Wrapper.cpp                    |   2 -
 extensions/aws/s3/S3Wrapper.h                      |  24 +-
 extensions/aws/tests/PutKinesisStreamTests.cpp     | 318 +++++++++++++++++++++
 extensions/aws/utils/AWSInitializer.cpp            |  19 +-
 extensions/aws/utils/AWSInitializer.h              |   2 -
 extensions/aws/utils/AWSSdkLogger.cpp              |   2 -
 extensions/aws/utils/AWSSdkLogger.h                |   2 -
 extensions/aws/utils/ProxyOptions.h                |  33 +++
 thirdparty/aws-sdk-cpp/dll-export-injection.patch  | 185 ++++++++----
 44 files changed, 1252 insertions(+), 465 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index 862a1182e..051e7377b 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -82,6 +82,7 @@ limitations under the License.
 - [PutCouchbaseKey](#PutCouchbaseKey)
 - [PutFile](#PutFile)
 - [PutGCSObject](#PutGCSObject)
+- [PutKinesisStream](#PutKinesisStream)
 - [PutOPCProcessor](#PutOPCProcessor)
 - [PutS3Object](#PutS3Object)
 - [PutSFTP](#PutSFTP)
@@ -2432,6 +2433,52 @@ In the list below, the names of required properties 
appear in bold. Any other pr
 | gcs.encryption.sha256    | success      | The SHA256 hash of the key used to 
encrypt the object.             |
 
 
+## PutKinesisStream
+
+### Description
+
+Sends the contents to a specified Amazon Kinesis. In order to send data to 
Kinesis, the stream name has to be specified.
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other 
properties (not in bold) are considered optional. The table also indicates any 
default values, and whether a property supports the NiFi Expression Language.
+
+| Name                                | Default Value | Allowable Values       
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+|-------------------------------------|---------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+| Access Key                          |               |                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| Secret Key                          |               |                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| Credentials File                    |               |                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| AWS Credentials Provider service    |               |                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| **Region**                          | us-west-2     | 
af-south-1<br/>ap-east-1<br/>ap-northeast-1<br/>ap-northeast-2<br/>ap-northeast-3<br/>ap-south-1<br/>ap-south-2<br/>ap-southeast-1<br/>ap-southeast-2<br/>ap-southeast-3<br/>ap-southeast-4<br/>ap-southeast-5<br/>ap-southeast-7<br/>ca-central-1<br/>ca-west-1<br/>cn-north-1<br/>cn-northwest-1<br/>eu-central-1<br/>eu-central-2<br/>eu-isoe-west-1<br/>eu-north-1<br/>eu-south-1<br/>eu-south-2<br/>eu-west-1<br/>eu-west-2<br/>eu-west-3<br/>i
 [...]
+| **Communications Timeout**          | 30 sec        |                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| Endpoint Override URL               |               |                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| Proxy Host                          |               |                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| Proxy Port                          |               |                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| Proxy Username                      |               |                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| Proxy Password                      |               |                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| **Use Default Credentials**         | false         | true<br/>false         
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| **Amazon Kinesis Stream Name**      |               |                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| Amazon Kinesis Stream Partition Key |               |                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| Batch Size                          | 250           |                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| Max Batch Data Size                 | 1 MB          |                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+
+### Relationships
+
+| Name    | Description                                  |
+|---------|----------------------------------------------|
+| success | FlowFiles are routed to success relationship |
+| failure | FlowFiles are routed to failure relationship |
+
+### Output Attributes
+
+| Attribute                   | Relationship | Description                     
                            |
+|-----------------------------|--------------|-------------------------------------------------------------|
+| aws.kinesis.error.message   | failure      | Error message on posting 
message to AWS Kinesis             |
+| aws.kinesis.error.code      | failure      | Error code for the message when 
posting to AWS Kinesis      |
+| aws.kinesis.sequence.number | success      | Sequence number for the message 
when posting to AWS Kinesis |
+| aws.kinesis.shard.id        | success      | Shard id of the message posted 
to AWS Kinesis               |
+
+
 ## PutOPCProcessor
 
 ### Description
diff --git a/README.md b/README.md
index 790238afa..5e7102866 100644
--- a/README.md
+++ b/README.md
@@ -73,7 +73,7 @@ The next table outlines CMAKE flags that correspond with 
MiNiFi extensions. Exte
 | Extension Set                    | Processors and Controller Services        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 
|----------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
 | Archive Extensions               | 
[ApplyTemplate](PROCESSORS.md#applytemplate)<br/>[BinFiles](PROCESSORS.md#binfiles)<br/>[CompressContent](PROCESSORS.md#compresscontent)<br/>[ManipulateArchive](PROCESSORS.md#manipulatearchive)<br/>[MergeContent](PROCESSORS.md#mergecontent)<br/>[FocusArchiveEntry](PROCESSORS.md#focusarchiveentry)<br/>[UnfocusArchiveEntry](PROCESSORS.md#unfocusarchiveentry)
                                                                                
                  [...]
-| AWS                              | 
[AWSCredentialsService](CONTROLLERS.md#awscredentialsservice)<br/>[PutS3Object](PROCESSORS.md#puts3object)<br/>[DeleteS3Object](PROCESSORS.md#deletes3object)<br/>[FetchS3Object](PROCESSORS.md#fetchs3object)<br/>[ListS3](PROCESSORS.md#lists3)
                                                                                
                                                                                
                                                       [...]
+| AWS                              | 
[AWSCredentialsService](CONTROLLERS.md#awscredentialsservice)<br/>[PutS3Object](PROCESSORS.md#puts3object)<br/>[DeleteS3Object](PROCESSORS.md#deletes3object)<br/>[FetchS3Object](PROCESSORS.md#fetchs3object)<br/>[ListS3](PROCESSORS.md#lists3)<br/>[PutKinesisStream](PROCESSORS.md#putkinesisstream)
                                                                                
                                                                                
[...]
 | Azure                            | 
[AzureStorageCredentialsService](CONTROLLERS.md#azurestoragecredentialsservice)<br/>[PutAzureBlobStorage](PROCESSORS.md#putazureblobstorage)<br/>[DeleteAzureBlobStorage](PROCESSORS.md#deleteazureblobstorage)<br/>[FetchAzureBlobStorage](PROCESSORS.md#fetchazureblobstorage)<br/>[ListAzureBlobStorage](PROCESSORS.md#listazureblobstorage)<br/>[PutAzureDataLakeStorage](PROCESSORS.md#putazuredatalakestorage)<br/>[DeleteAzureDataLakeStorage](PROCESSORS.md#del
 [...]
 | CivetWeb                         | [ListenHTTP](PROCESSORS.md#listenhttp)    
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | Couchbase                        | 
[CouchbaseClusterService](CONTROLLERS.md#couchbaseclusterservice)<br/>[PutCouchbaseKey](PROCESSORS.md#putcouchbasekey)<br/>[GetCouchbaseKey](PROCESSORS.md#getcouchbasekey)
                                                                                
                                                                                
                                                                                
                                             [...]
diff --git a/cmake/BundledAwsSdkCpp.cmake b/cmake/BundledAwsSdkCpp.cmake
index b223098ac..672f73d7b 100644
--- a/cmake/BundledAwsSdkCpp.cmake
+++ b/cmake/BundledAwsSdkCpp.cmake
@@ -60,7 +60,9 @@ function(use_bundled_libaws SOURCE_DIR BINARY_DIR)
             "${LIBDIR}/${PREFIX}aws-c-compression.${SUFFIX}"
             "${LIBDIR}/${PREFIX}aws-c-sdkutils.${SUFFIX}"
             "${LIBDIR}/${PREFIX}aws-cpp-sdk-core.${SUFFIX}"
-            "${LIBDIR}/${PREFIX}aws-cpp-sdk-s3.${SUFFIX}")
+            "${LIBDIR}/${PREFIX}aws-cpp-sdk-s3.${SUFFIX}"
+            "${LIBDIR}/${PREFIX}aws-cpp-sdk-kinesis.${SUFFIX}"
+    )
 
     FOREACH(BYPRODUCT ${BYPRODUCTS})
         LIST(APPEND AWSSDK_LIBRARIES_LIST 
"${BINARY_DIR}/thirdparty/libaws-install/${BYPRODUCT}")
@@ -69,13 +71,13 @@ function(use_bundled_libaws SOURCE_DIR BINARY_DIR)
     set(AWS_SDK_CPP_CMAKE_ARGS ${PASSTHROUGH_CMAKE_ARGS}
             -DCMAKE_PREFIX_PATH=${BINARY_DIR}/thirdparty/libaws-install
             -DCMAKE_INSTALL_PREFIX=${BINARY_DIR}/thirdparty/libaws-install
-            -DBUILD_ONLY=s3
+            -DBUILD_ONLY=kinesis%s3
             -DENABLE_TESTING=OFF
             -DBUILD_SHARED_LIBS=OFF
             -DENABLE_UNITY_BUILD=${AWS_ENABLE_UNITY_BUILD})
 
     if(WIN32)
-        list(APPEND AWS_SDK_CPP_CMAKE_ARGS -DFORCE_EXPORT_CORE_API=ON 
-DFORCE_EXPORT_S3_API=ON)
+        list(APPEND AWS_SDK_CPP_CMAKE_ARGS -DFORCE_EXPORT_CORE_API=ON 
-DFORCE_EXPORT_S3_API=ON -DFORCE_EXPORT_KINESIS_API=ON)
     endif()
 
     append_third_party_passthrough_args(AWS_SDK_CPP_CMAKE_ARGS 
"${AWS_SDK_CPP_CMAKE_ARGS}")
@@ -209,4 +211,10 @@ function(use_bundled_libaws SOURCE_DIR BINARY_DIR)
     add_dependencies(AWS::aws-cpp-sdk-s3 aws-sdk-cpp-external)
     target_include_directories(AWS::aws-cpp-sdk-s3 INTERFACE 
${LIBAWS_INCLUDE_DIR})
     target_link_libraries(AWS::aws-cpp-sdk-s3 INTERFACE AWS::aws-cpp-sdk-core)
+
+    add_library(AWS::aws-cpp-sdk-kinesis STATIC IMPORTED)
+    set_target_properties(AWS::aws-cpp-sdk-kinesis PROPERTIES 
IMPORTED_LOCATION 
"${BINARY_DIR}/thirdparty/libaws-install/${LIBDIR}/${PREFIX}aws-cpp-sdk-kinesis.${SUFFIX}")
+    add_dependencies(AWS::aws-cpp-sdk-kinesis aws-sdk-cpp-external)
+    target_include_directories(AWS::aws-cpp-sdk-kinesis INTERFACE 
${LIBAWS_INCLUDE_DIR})
+    target_link_libraries(AWS::aws-cpp-sdk-kinesis INTERFACE 
AWS::aws-cpp-sdk-core)
 endfunction(use_bundled_libaws)
diff --git a/docker/test/integration/cluster/ContainerStore.py 
b/docker/test/integration/cluster/ContainerStore.py
index 79f88fb89..b73a6ac8e 100644
--- a/docker/test/integration/cluster/ContainerStore.py
+++ b/docker/test/integration/cluster/ContainerStore.py
@@ -20,6 +20,7 @@ from .containers.NifiContainer import NifiContainer
 from .containers.NifiContainer import NiFiOptions
 from .containers.ZookeeperContainer import ZookeeperContainer
 from .containers.KafkaBrokerContainer import KafkaBrokerContainer
+from .containers.KinesisServerContainer import KinesisServerContainer
 from .containers.S3ServerContainer import S3ServerContainer
 from .containers.AzureStorageServerContainer import AzureStorageServerContainer
 from .containers.FakeGcsServerContainer import FakeGcsServerContainer
@@ -153,6 +154,14 @@ class ContainerStore:
                                                                 
network=self.network,
                                                                 
image_store=self.image_store,
                                                                 
command=command))
+        elif engine == 'kinesis-server':
+            return self.containers.setdefault(container_name,
+                                              
KinesisServerContainer(feature_context=feature_context,
+                                                                     
name=container_name,
+                                                                     
vols=self.vols,
+                                                                     
network=self.network,
+                                                                     
image_store=self.image_store,
+                                                                     
command=command))
         elif engine == 'azure-storage-server':
             return self.containers.setdefault(container_name,
                                               
AzureStorageServerContainer(feature_context=feature_context,
diff --git a/docker/test/integration/cluster/DockerTestCluster.py 
b/docker/test/integration/cluster/DockerTestCluster.py
index faa42b727..6d39022e1 100644
--- a/docker/test/integration/cluster/DockerTestCluster.py
+++ b/docker/test/integration/cluster/DockerTestCluster.py
@@ -202,6 +202,10 @@ class DockerTestCluster:
                  and output.count("TCP_MISS") >= output.count("TCP_DENIED"))
                  or output.count("TCP_DENIED") == 0 and "TCP_MISS" in output)
 
+    def check_kinesis_server_record_data(self, container_name, record_data):
+        container_name = 
self.container_store.get_container_name_with_postfix(container_name)
+        return 
self.aws_checker.check_kinesis_server_record_data(container_name, record_data)
+
     def check_s3_server_object_data(self, container_name, test_data):
         container_name = 
self.container_store.get_container_name_with_postfix(container_name)
         return self.aws_checker.check_s3_server_object_data(container_name, 
test_data)
diff --git a/docker/test/integration/cluster/ImageStore.py 
b/docker/test/integration/cluster/ImageStore.py
index 100b1c22b..723591e4a 100644
--- a/docker/test/integration/cluster/ImageStore.py
+++ b/docker/test/integration/cluster/ImageStore.py
@@ -67,6 +67,8 @@ class ImageStore:
             image = self.__build_mqtt_broker_image()
         elif container_engine == "splunk":
             image = self.__build_splunk_image()
+        elif container_engine == "kinesis-server":
+            image = self.__build_kinesis_image()
         elif container_engine == "reverse-proxy":
             image = self.__build_reverse_proxy_image()
         elif container_engine == "diag-slave-tcp":
@@ -276,6 +278,9 @@ class ImageStore:
 
         return self.__build_image(dockerfile)
 
+    def __build_kinesis_image(self):
+        return self.__build_image_by_path(self.test_dir + 
"/resources/kinesis-mock", 'kinesis-server')
+
     def __build_splunk_image(self):
         return self.__build_image_by_path(self.test_dir + 
"/resources/splunk-hec", 'minifi-splunk')
 
diff --git a/docker/test/integration/cluster/checkers/AwsChecker.py 
b/docker/test/integration/cluster/checkers/AwsChecker.py
index 7a5e6ca71..c9e4ea057 100644
--- a/docker/test/integration/cluster/checkers/AwsChecker.py
+++ b/docker/test/integration/cluster/checkers/AwsChecker.py
@@ -20,6 +20,11 @@ class AwsChecker:
     def __init__(self, container_communicator):
         self.container_communicator = container_communicator
 
+    @retry_check()
+    def check_kinesis_server_record_data(self, container_name, record_data):
+        (code, output) = 
self.container_communicator.execute_command(container_name, ["node", 
"/app/consumer/consumer.js", record_data])
+        return code == 0
+
     @retry_check()
     def check_s3_server_object_data(self, container_name, test_data):
         (code, output) = 
self.container_communicator.execute_command(container_name, ["find", 
"/s3mockroot/test_bucket", "-mindepth", "1", "-maxdepth", "1", "-type", "d"])
diff --git 
a/docker/test/integration/cluster/containers/KinesisServerContainer.py 
b/docker/test/integration/cluster/containers/KinesisServerContainer.py
new file mode 100644
index 000000000..0bee46cef
--- /dev/null
+++ b/docker/test/integration/cluster/containers/KinesisServerContainer.py
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import logging
+from .Container import Container
+
+
+class KinesisServerContainer(Container):
+    def __init__(self, feature_context, name, vols, network, image_store, 
command):
+        super().__init__(feature_context, name, 'kinesis-server', vols, 
network, image_store, command)
+
+    def get_startup_finished_log_entry(self):
+        return "Starting Kinesis Plain Mock Service on port 4568"
+
+    def deploy(self):
+        if not self.set_deployed():
+            return
+
+        logging.info('Creating and running kinesis server docker container...')
+        self.client.containers.run(
+            self.image_store.get_image(self.get_engine()),
+            detach=True,
+            name=self.name,
+            network=self.network.name,
+            environment=["INITIALIZE_STREAMS=test_stream:3",
+                         "LOG_LEVEL=DEBUG"],
+            entrypoint=self.command)
+        logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/features/MiNiFi_integration_test_driver.py 
b/docker/test/integration/features/MiNiFi_integration_test_driver.py
index 71a280f7e..8f2747714 100644
--- a/docker/test/integration/features/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/features/MiNiFi_integration_test_driver.py
@@ -251,6 +251,9 @@ class MiNiFi_integration_test:
         assert not self.cluster.segfault_happened() or 
self.cluster.log_app_output()
         assert validator.validate() or self.cluster.log_app_output()
 
+    def check_kinesis_server_record_data(self, kinesis_container_name, 
record_data):
+        assert 
self.cluster.check_kinesis_server_record_data(kinesis_container_name, 
record_data) or self.cluster.log_app_output()
+
     def check_s3_server_object_data(self, s3_container_name, object_data):
         assert self.cluster.check_s3_server_object_data(s3_container_name, 
object_data) or self.cluster.log_app_output()
 
diff --git a/docker/test/integration/features/kinesis.feature 
b/docker/test/integration/features/kinesis.feature
new file mode 100644
index 000000000..2f68fd77c
--- /dev/null
+++ b/docker/test/integration/features/kinesis.feature
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+@ENABLE_AWS
+Feature: Sending data from MiNiFi-C++ to an AWS Kinesis server
+  In order to transfer data to interact with AWS Kinesis server
+  As a user of MiNiFi
+  I need to have PutKinesisStream processor
+
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  Scenario: A MiNiFi instance can send data to AWS Kinesis
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content "Schnappi, das kleine Krokodil" is present in 
"/tmp/input"
+    And a PutKinesisStream processor set up to communicate with the kinesis 
server
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to 
the PutKinesisStream
+    And the "success" relationship of the PutKinesisStream processor is 
connected to the PutFile
+
+    And a kinesis server is set up in correspondence with the PutKinesisStream
+
+    When both instances start up
+
+    Then a flowfile with the content "Schnappi, das kleine Krokodil" is placed 
in the monitored directory in less than 60 seconds
+    And there is a record on the kinesis server with "Schnappi, das kleine 
Krokodil"
diff --git a/docker/test/integration/features/steps/steps.py 
b/docker/test/integration/features/steps/steps.py
index 46e4e81ca..9c3bf66f6 100644
--- a/docker/test/integration/features/steps/steps.py
+++ b/docker/test/integration/features/steps/steps.py
@@ -128,6 +128,7 @@ def step_impl(context, processor_type, 
minifi_container_name):
 @given("a {processor_type} processor set up to communicate with a kafka broker 
instance")
 @given("a {processor_type} processor set up to communicate with an MQTT broker 
instance")
 @given("a {processor_type} processor set up to communicate with the Splunk HEC 
instance")
+@given("a {processor_type} processor set up to communicate with the kinesis 
server")
 def step_impl(context, processor_type):
     __create_processor(context, processor_type, processor_type, None, None, 
"minifi-cpp-flow")
 
@@ -473,6 +474,11 @@ def step_impl(context):
     context.test.acquire_container(context=context, name="s3-server", 
engine="s3-server")
 
 
+@given("a kinesis server is set up in correspondence with the 
PutKinesisStream")
+def step_impl(context):
+    context.test.acquire_container(context=context, name="kinesis-server", 
engine="kinesis-server")
+
+
 # azure storage setup
 @given("an Azure storage server is set up")
 def step_impl(context):
@@ -910,6 +916,11 @@ def step_impl(context, url):
     context.test.check_http_proxy_access('http-proxy', url)
 
 
+@then("there is a record on the kinesis server with \"{record_data}\"")
+def step_impl(context, record_data):
+    context.test.check_kinesis_server_record_data("kinesis-server", 
record_data)
+
+
 @then("the object on the s3 server is \"{object_data}\"")
 def step_impl(context, object_data):
     context.test.check_s3_server_object_data("s3-server", object_data)
diff --git a/docker/test/integration/minifi/processors/PutKinesisStream.py 
b/docker/test/integration/minifi/processors/PutKinesisStream.py
new file mode 100644
index 000000000..40e55e1c2
--- /dev/null
+++ b/docker/test/integration/minifi/processors/PutKinesisStream.py
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ..core.Processor import Processor
+
+
+class PutKinesisStream(Processor):
+    def __init__(
+            self,
+            context,
+            proxy_host='',
+            proxy_port='',
+            proxy_username='',
+            proxy_password=''):
+        super(PutKinesisStream, self).__init__(
+            context=context,
+            clazz='PutKinesisStream',
+            properties={
+                'Amazon Kinesis Stream Name': 'test_stream',
+                'Access Key': 'test_access_key',
+                'Secret Key': 'test_secret',
+                'Endpoint Override URL': 
f"http://kinesis-server-{context.feature_id}:4568";,
+                'Proxy Host': proxy_host,
+                'Proxy Port': proxy_port,
+                'Proxy Username': proxy_username,
+                'Proxy Password': proxy_password,
+                'Region': 'us-east-1'
+            },
+            auto_terminate=["success", "failure"])
diff --git a/docker/test/integration/resources/kinesis-mock/Dockerfile 
b/docker/test/integration/resources/kinesis-mock/Dockerfile
new file mode 100644
index 000000000..567acfb23
--- /dev/null
+++ b/docker/test/integration/resources/kinesis-mock/Dockerfile
@@ -0,0 +1,8 @@
+FROM node:lts-alpine
+WORKDIR /app
+RUN npm i kinesis-local
+RUN npm i @aws-sdk/client-kinesis @aws-sdk/node-http-handler
+COPY server.json ./
+COPY consumer ./consumer
+
+ENTRYPOINT npx kinesis-local
diff --git 
a/docker/test/integration/resources/kinesis-mock/consumer/consumer.js 
b/docker/test/integration/resources/kinesis-mock/consumer/consumer.js
new file mode 100644
index 000000000..07f48cb27
--- /dev/null
+++ b/docker/test/integration/resources/kinesis-mock/consumer/consumer.js
@@ -0,0 +1,64 @@
+import { KinesisClient, DescribeStreamCommand, GetShardIteratorCommand, 
GetRecordsCommand } from "@aws-sdk/client-kinesis";
+import { NodeHttpHandler } from "@aws-sdk/node-http-handler";
+
+const args = process.argv.slice(2);
+if (args.length === 0) {
+    console.error("Usage: node consumer.js <search-string>");
+    process.exit(1);
+}
+
+const searchString = args[0];
+const streamName = "test_stream";
+
+const kinesis = new KinesisClient({
+    endpoint: "http://localhost:4568";,
+    region: "us-east-1",
+    credentials: { accessKeyId: "fake", secretAccessKey: "fake" },
+    requestHandler: new NodeHttpHandler({ http2: false }),
+});
+
+async function getShardIterator(shardId) {
+    const command = new GetShardIteratorCommand({
+        StreamName: streamName,
+        ShardId: shardId,
+        ShardIteratorType: "TRIM_HORIZON",
+    });
+    const response = await kinesis.send(command);
+    return response.ShardIterator;
+}
+
+async function readShardRecords(shardId) {
+    let shardIterator = await getShardIterator(shardId);
+    const getRecordsCommand = new GetRecordsCommand({ ShardIterator: 
shardIterator });
+    const data = await kinesis.send(getRecordsCommand);
+
+    return data.Records.map(r => Buffer.from(r.Data).toString().trim());
+}
+
+async function readOnce() {
+    try {
+        console.log(`Checking stream '${streamName}' for: "${searchString}"`);
+
+        const describeCommand = new DescribeStreamCommand({ StreamName: 
streamName });
+        const { StreamDescription } = await kinesis.send(describeCommand);
+
+        for (let shard of StreamDescription.Shards) {
+            console.log(`Reading from shard: ${shard.ShardId}`);
+
+            const records = await readShardRecords(shard.ShardId);
+
+            if (records.includes(searchString)) {
+                console.log(`Found "${searchString}" in records.`);
+                process.exit(0);
+            }
+        }
+
+        console.log(`"${searchString}" not found in any shard.`);
+        process.exit(-1);
+    } catch (error) {
+        console.error("Error reading stream:", error);
+        process.exit(1);
+    }
+}
+
+readOnce();
diff --git 
a/docker/test/integration/resources/kinesis-mock/consumer/package.json 
b/docker/test/integration/resources/kinesis-mock/consumer/package.json
new file mode 100644
index 000000000..aead43de3
--- /dev/null
+++ b/docker/test/integration/resources/kinesis-mock/consumer/package.json
@@ -0,0 +1,3 @@
+{
+  "type": "module"
+}
\ No newline at end of file
diff --git a/docker/test/integration/resources/kinesis-mock/server.json 
b/docker/test/integration/resources/kinesis-mock/server.json
new file mode 100644
index 000000000..cce12612b
--- /dev/null
+++ b/docker/test/integration/resources/kinesis-mock/server.json
@@ -0,0 +1,6 @@
+{
+  "server": {
+    "key": "-----BEGIN PRIVATE 
KEY-----\nMIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDeEXM0RFWMOxkB\neBP3YcJhYhiouRvtHBnMj0M79Umst1OlfB2qqZ9P2nlVWN9E+vzbbbvtJBir/bkJ\nWMnh3sj86nY5EYUlaAwi2oQMVkivHFfweK/Usspy8bAyKBwM5BNJhnAX7GdR6cXu\nv7S2fUjGbp6bkATCypROrBC4HmOSM+GabZRQA6/EiKsYa53NxmBVgrm7twO3eoeS\ntVzH15/orWJm/8ukxq//E+WI4qb0LiRT79FjWoRK2czpAzP6+JqdQTSg1msCbsod\n/+k7nc5CVMStnkKMBOk1jniAfVoqDAtGJRlIMpWXtr6vQFiZE4z6t3bl2ZY/h6p6\nqmTkL1gXAgMBAAECggEBAMJ6v8zvZ4hXHVAnDD1jlStaEMR60NU3/fQ
 [...]
+    "cert": "-----BEGIN 
CERTIFICATE-----\nMIIDfTCCAmWgAwIBAgIEW6AYvzANBgkqhkiG9w0BAQsFADBuMRAwDgYDVQQGEwdV\nbmtub3duMRAwDgYDVQQIEwdVbmtub3duMRAwDgYDVQQHEwdVbmtub3duMRAwDgYD\nVQQKEwdVbmtub3duMRIwEAYDVQQLEwlGUzIgVGVzdHMxEDAOBgNVBAMTB1Vua25v\nd24wIBcNMTkxMjAyMTMyOTM3WhgPMjExODA2MjYxMzI5MzdaMG4xEDAOBgNVBAYT\nB1Vua25vd24xEDAOBgNVBAgTB1Vua25vd24xEDAOBgNVBAcTB1Vua25vd24xEDAO\nBgNVBAoTB1Vua25vd24xEjAQBgNVBAsTCUZTMiBUZXN0czEQMA4GA1UEAxMHVW5r\nbm93bjCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAN4Rcz
 [...]
+  }
+}
diff --git a/extensions/aws/AWSCredentialsProvider.cpp 
b/extensions/aws/AWSCredentialsProvider.cpp
index a4ed3c42e..11c82d352 100644
--- a/extensions/aws/AWSCredentialsProvider.cpp
+++ b/extensions/aws/AWSCredentialsProvider.cpp
@@ -1,6 +1,4 @@
 /**
- * @file AWSCredentialsProvider.cpp
- * AWSCredentialsProvider class implementation
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
diff --git a/extensions/aws/AWSCredentialsProvider.h 
b/extensions/aws/AWSCredentialsProvider.h
index 664f84709..701b6a533 100644
--- a/extensions/aws/AWSCredentialsProvider.h
+++ b/extensions/aws/AWSCredentialsProvider.h
@@ -1,6 +1,4 @@
 /**
- * @file AWSCredentialsProvider.h
- * AWSCredentialsProvider class implementation
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
diff --git a/extensions/aws/CMakeLists.txt b/extensions/aws/CMakeLists.txt
index 11ae83543..a2bc0cd44 100644
--- a/extensions/aws/CMakeLists.txt
+++ b/extensions/aws/CMakeLists.txt
@@ -33,14 +33,20 @@ add_minifi_library(minifi-aws SHARED ${SOURCES})
 target_link_libraries(minifi-aws PUBLIC ${LIBMINIFI} Threads::Threads)
 
 target_wholearchive_library_private(minifi-aws AWS::aws-cpp-sdk-s3)
+target_wholearchive_library_private(minifi-aws AWS::aws-cpp-sdk-kinesis)
 if(CMAKE_SYSTEM_PROCESSOR MATCHES "(arm64)|(ARM64)|(aarch64)|(armv8)")
     target_wholearchive_library_private(minifi-aws AWS::aws-checksums)
 endif()
-get_target_property(AWS_SDK_INCLUDE_DIRS AWS::aws-cpp-sdk-s3 
INTERFACE_INCLUDE_DIRECTORIES)
-target_include_directories(minifi-aws INTERFACE ${AWS_SDK_INCLUDE_DIRS})
+get_target_property(AWS_SDK_S3_INCLUDE_DIRS AWS::aws-cpp-sdk-s3 
INTERFACE_INCLUDE_DIRECTORIES)
+get_target_property(AWS_SDK_KINESIS_INCLUDE_DIRS AWS::aws-cpp-sdk-kinesis 
INTERFACE_INCLUDE_DIRECTORIES)
+
+target_include_directories(minifi-aws INTERFACE ${AWS_SDK_S3_INCLUDE_DIRS})
+target_include_directories(minifi-aws INTERFACE 
${AWS_SDK_KINESIS_INCLUDE_DIRS})
+
 if(WIN32)
     target_compile_definitions(minifi-aws INTERFACE 
"AWS_CORE_API=__declspec(dllimport)")
     target_compile_definitions(minifi-aws INTERFACE 
"AWS_S3_API=__declspec(dllimport)")
+    target_compile_definitions(minifi-aws INTERFACE 
"AWS_KINESIS_API=__declspec(dllimport)")
 endif()
 
 register_extension(minifi-aws "AWS EXTENSIONS" AWS-EXTENSIONS "This enables 
AWS support" "extensions/aws/tests")
diff --git a/extensions/aws/processors/S3Processor.cpp 
b/extensions/aws/processors/AwsProcessor.cpp
similarity index 69%
copy from extensions/aws/processors/S3Processor.cpp
copy to extensions/aws/processors/AwsProcessor.cpp
index 50351cec1..4ff850ee0 100644
--- a/extensions/aws/processors/S3Processor.cpp
+++ b/extensions/aws/processors/AwsProcessor.cpp
@@ -15,13 +15,13 @@
  * limitations under the License.
  */
 
-#include "S3Processor.h"
+#include "AwsProcessor.h"
 
 #include <memory>
 #include <string>
 #include <utility>
 
-#include "AWSCredentialsService.h"
+#include "controllerservices/AWSCredentialsService.h"
 #include "S3Wrapper.h"
 #include "core/ProcessContext.h"
 #include "properties/Properties.h"
@@ -32,18 +32,12 @@
 
 namespace org::apache::nifi::minifi::aws::processors {
 
-S3Processor::S3Processor(std::string_view name, const 
minifi::utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger)
+AwsProcessor::AwsProcessor(std::string_view name, const 
minifi::utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger)
   : core::ProcessorImpl(name, uuid),
     logger_(std::move(logger)) {
 }
 
-S3Processor::S3Processor(std::string_view name, const 
minifi::utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger, 
std::unique_ptr<aws::s3::S3RequestSender> s3_request_sender)
-  : core::ProcessorImpl(name, uuid),
-    logger_(std::move(logger)),
-    s3_wrapper_(std::move(s3_request_sender)) {
-}
-
-std::optional<Aws::Auth::AWSCredentials> 
S3Processor::getAWSCredentialsFromControllerService(core::ProcessContext& 
context) const {
+std::optional<Aws::Auth::AWSCredentials> 
AwsProcessor::getAWSCredentialsFromControllerService(core::ProcessContext& 
context) const {
   if (const auto aws_credentials_service = 
minifi::utils::parseOptionalControllerService<controllers::AWSCredentialsService>(context,
 AWSCredentialsProviderService, getUUID())) {
     return (*aws_credentials_service)->getAWSCredentials();
   }
@@ -52,7 +46,7 @@ std::optional<Aws::Auth::AWSCredentials> 
S3Processor::getAWSCredentialsFromContr
   return std::nullopt;
 }
 
-std::optional<Aws::Auth::AWSCredentials> S3Processor::getAWSCredentials(
+std::optional<Aws::Auth::AWSCredentials> AwsProcessor::getAWSCredentials(
     core::ProcessContext& context,
     const core::FlowFile* const flow_file) {
   auto service_cred = getAWSCredentialsFromControllerService(context);
@@ -78,8 +72,8 @@ std::optional<Aws::Auth::AWSCredentials> 
S3Processor::getAWSCredentials(
   return aws_credentials_provider.getAWSCredentials();
 }
 
-std::optional<aws::s3::ProxyOptions> 
S3Processor::getProxy(core::ProcessContext& context, const core::FlowFile* 
const flow_file) {
-  aws::s3::ProxyOptions proxy;
+aws::ProxyOptions AwsProcessor::getProxy(core::ProcessContext& context, const 
core::FlowFile* const flow_file) {
+  aws::ProxyOptions proxy;
 
   proxy.host = minifi::utils::parseOptionalProperty(context, ProxyHost, 
flow_file).value_or("");
   proxy.port = 
gsl::narrow<uint32_t>(minifi::utils::parseOptionalU64Property(context, 
ProxyPort, flow_file).value_or(0));
@@ -87,22 +81,19 @@ std::optional<aws::s3::ProxyOptions> 
S3Processor::getProxy(core::ProcessContext&
   proxy.password = minifi::utils::parseOptionalProperty(context, 
ProxyPassword, flow_file).value_or("");
 
   if (!proxy.host.empty()) {
-    logger_->log_info("Proxy for S3Processor was set.");
+    logger_->log_info("Proxy for AwsProcessor was set.");
   }
   return proxy;
 }
 
-void S3Processor::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
+void AwsProcessor::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
   client_config_ = Aws::Client::ClientConfiguration();
-  if (!getProperty(Bucket.name)) {
-    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Bucket property missing or 
invalid");
-  }
 
   client_config_->region = context.getProperty(Region) | 
minifi::utils::orThrow("Region property missing or invalid");
-  logger_->log_debug("S3Processor: Region [{}]", client_config_->region);
+  logger_->log_debug("AwsProcessor: Region [{}]", client_config_->region);
 
   if (auto communications_timeout = 
minifi::utils::parseOptionalDurationProperty(context, CommunicationsTimeout)) {
-    logger_->log_debug("S3Processor: Communications Timeout {}", 
*communications_timeout);
+    logger_->log_debug("AwsProcessor: Communications Timeout {}", 
*communications_timeout);
     client_config_->connectTimeoutMs = 
gsl::narrow<long>(communications_timeout->count());  // 
NOLINT(runtime/int,google-runtime-int)
   } else {
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Communications Timeout 
missing or invalid");
@@ -114,17 +105,10 @@ void S3Processor::onSchedule(core::ProcessContext& 
context, core::ProcessSession
   }
 }
 
-std::optional<CommonProperties> S3Processor::getCommonELSupportedProperties(
+std::optional<CommonProperties> AwsProcessor::getCommonELSupportedProperties(
     core::ProcessContext& context,
     const core::FlowFile* const flow_file) {
   CommonProperties properties;
-  if (auto bucket = context.getProperty(Bucket, flow_file); !bucket || 
bucket->empty()) {
-    logger_->log_error("Bucket '{}' is invalid or empty!", properties.bucket);
-    return std::nullopt;
-  } else {
-    properties.bucket = *bucket;
-  }
-  logger_->log_debug("S3Processor: Bucket [{}]", properties.bucket);
 
   auto credentials = getAWSCredentials(context, flow_file);
   if (!credentials) {
@@ -132,17 +116,12 @@ std::optional<CommonProperties> 
S3Processor::getCommonELSupportedProperties(
     return std::nullopt;
   }
   properties.credentials = credentials.value();
-
-  auto proxy = getProxy(context, flow_file);
-  if (!proxy) {
-    return std::nullopt;
-  }
-  properties.proxy = proxy.value();
+  properties.proxy = getProxy(context, flow_file);
 
   const auto endpoint_override_url = context.getProperty(EndpointOverrideURL, 
flow_file);
   if (endpoint_override_url) {
     properties.endpoint_override_url = *endpoint_override_url;
-    logger_->log_debug("S3Processor: Endpoint Override URL [{}]", 
properties.endpoint_override_url);
+    logger_->log_debug("AwsProcessor: Endpoint Override URL [{}]", 
properties.endpoint_override_url);
   }
 
   return properties;
diff --git a/extensions/aws/processors/S3Processor.h 
b/extensions/aws/processors/AwsProcessor.h
similarity index 88%
copy from extensions/aws/processors/S3Processor.h
copy to extensions/aws/processors/AwsProcessor.h
index fe628e27c..9f0055e2e 100644
--- a/extensions/aws/processors/S3Processor.h
+++ b/extensions/aws/processors/AwsProcessor.h
@@ -1,6 +1,4 @@
 /**
- * @file S3Processor.h
- * Base S3 processor class declaration
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -29,16 +27,13 @@
 #include <utility>
 
 #include "aws/core/auth/AWSCredentialsProvider.h"
-#include "S3Wrapper.h"
 #include "AWSCredentialsProvider.h"
-#include "core/Property.h"
+#include "utils/ProxyOptions.h"
 #include "core/PropertyDefinition.h"
 #include "core/PropertyDefinitionBuilder.h"
 #include "minifi-cpp/core/PropertyValidator.h"
 #include "core/Processor.h"
-#include "core/logging/Logger.h"
-#include "core/logging/LoggerFactory.h"
-#include "utils/OptionalUtils.h"
+
 
 namespace org::apache::nifi::minifi::aws::processors {
 
@@ -94,21 +89,13 @@ inline constexpr auto REGIONS = std::array{
 }  // namespace region
 
 struct CommonProperties {
-  std::string bucket;
-  std::string object_key;
   Aws::Auth::AWSCredentials credentials;
-  aws::s3::ProxyOptions proxy;
+  aws::ProxyOptions proxy;
   std::string endpoint_override_url;
 };
 
-class S3Processor : public core::ProcessorImpl {
+class AwsProcessor : public core::ProcessorImpl {
  public:
-  EXTENSIONAPI static constexpr auto Bucket = 
core::PropertyDefinitionBuilder<>::createProperty("Bucket")
-      .withDescription("The S3 bucket")
-      .isRequired(true)
-      .withValidator(core::StandardPropertyValidators::NON_BLANK_VALIDATOR)
-      .supportsExpressionLanguage(true)
-      .build();
   EXTENSIONAPI static constexpr auto AccessKey = 
core::PropertyDefinitionBuilder<>::createProperty("Access Key")
       .withDescription("AWS account access key")
       .supportsExpressionLanguage(true)
@@ -141,8 +128,8 @@ class S3Processor : public core::ProcessorImpl {
           "port, and path. The AWS libraries select an endpoint URL based on 
the AWS "
           "region, but this property overrides the selected endpoint URL, 
allowing use "
           "with other S3-compatible endpoints.")
-      .supportsExpressionLanguage(true)
       .withValidator(core::StandardPropertyValidators::NON_BLANK_VALIDATOR)
+      .supportsExpressionLanguage(true)
       .build();
   EXTENSIONAPI static constexpr auto ProxyHost = 
core::PropertyDefinitionBuilder<>::createProperty("Proxy Host")
       .withDescription("Proxy host name or IP")
@@ -168,7 +155,6 @@ class S3Processor : public core::ProcessorImpl {
       .isRequired(true)
       .build();
   EXTENSIONAPI static constexpr auto Properties = 
std::to_array<core::PropertyReference>({
-      Bucket,
       AccessKey,
       SecretKey,
       CredentialsFile,
@@ -184,20 +170,17 @@ class S3Processor : public core::ProcessorImpl {
   });
 
 
-  explicit S3Processor(std::string_view name, const minifi::utils::Identifier& 
uuid, std::shared_ptr<core::logging::Logger> logger);
+  explicit AwsProcessor(std::string_view name, const 
minifi::utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger);
 
   void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& 
session_factory) override;
 
  protected:
-  explicit S3Processor(std::string_view name, const minifi::utils::Identifier& 
uuid, std::shared_ptr<core::logging::Logger> logger, 
std::unique_ptr<aws::s3::S3RequestSender> s3_request_sender);
-
   std::optional<Aws::Auth::AWSCredentials> 
getAWSCredentialsFromControllerService(core::ProcessContext& context) const;
-  std::optional<Aws::Auth::AWSCredentials> 
getAWSCredentials(core::ProcessContext& context, const core::FlowFile* const 
flow_file);
-  std::optional<aws::s3::ProxyOptions> getProxy(core::ProcessContext& context, 
const core::FlowFile* const flow_file);
-  std::optional<CommonProperties> 
getCommonELSupportedProperties(core::ProcessContext& context, const 
core::FlowFile* const flow_file);
+  std::optional<Aws::Auth::AWSCredentials> 
getAWSCredentials(core::ProcessContext& context, const core::FlowFile* 
flow_file);
+  aws::ProxyOptions getProxy(core::ProcessContext& context, const 
core::FlowFile* const flow_file);
+  std::optional<CommonProperties> 
getCommonELSupportedProperties(core::ProcessContext& context, const 
core::FlowFile* flow_file);
 
   std::shared_ptr<core::logging::Logger> logger_;
-  aws::s3::S3Wrapper s3_wrapper_;
   std::optional<Aws::Client::ClientConfiguration> client_config_;
 };
 
diff --git a/extensions/aws/processors/DeleteS3Object.cpp 
b/extensions/aws/processors/DeleteS3Object.cpp
index e02efaf5b..84b375e6e 100644
--- a/extensions/aws/processors/DeleteS3Object.cpp
+++ b/extensions/aws/processors/DeleteS3Object.cpp
@@ -32,9 +32,10 @@ void DeleteS3Object::initialize() {
 }
 
 std::optional<aws::s3::DeleteObjectRequestParameters> 
DeleteS3Object::buildDeleteS3RequestParams(
-    core::ProcessContext& context,
+    const core::ProcessContext& context,
     const core::FlowFile& flow_file,
-    const CommonProperties &common_properties) const {
+    const CommonProperties& common_properties,
+    const std::string_view bucket) const {
   gsl_Expects(client_config_);
   aws::s3::DeleteObjectRequestParameters params(common_properties.credentials, 
*client_config_);
   if (const auto object_key = context.getProperty(ObjectKey, &flow_file)) {
@@ -51,7 +52,7 @@ std::optional<aws::s3::DeleteObjectRequestParameters> 
DeleteS3Object::buildDelet
   }
   logger_->log_debug("DeleteS3Object: Version [{}]", params.version);
 
-  params.bucket = common_properties.bucket;
+  params.bucket = bucket;
   params.setClientConfig(common_properties.proxy, 
common_properties.endpoint_override_url);
   return params;
 }
@@ -70,17 +71,25 @@ void DeleteS3Object::onTrigger(core::ProcessContext& 
context, core::ProcessSessi
     return;
   }
 
-  auto params = buildDeleteS3RequestParams(context, *flow_file, 
*common_properties);
+  auto bucket = context.getProperty(Bucket.name, flow_file.get());
+  if (!bucket) {
+    logger_->log_error("Bucket is invalid due to {}", 
bucket.error().message());
+    session.transfer(flow_file, Failure);
+    return;
+  }
+  logger_->log_debug("S3Processor: Bucket [{}]", *bucket);
+
+  auto params = buildDeleteS3RequestParams(context, *flow_file, 
*common_properties, *bucket);
   if (!params) {
     session.transfer(flow_file, Failure);
     return;
   }
 
   if (s3_wrapper_.deleteObject(*params)) {
-    logger_->log_debug("Successfully deleted S3 object '{}' from bucket '{}'", 
params->object_key, common_properties->bucket);
+    logger_->log_debug("Successfully deleted S3 object '{}' from bucket '{}'", 
params->object_key, *bucket);
     session.transfer(flow_file, Success);
   } else {
-    logger_->log_error("Failed to delete S3 object '{}' from bucket '{}'", 
params->object_key, common_properties->bucket);
+    logger_->log_error("Failed to delete S3 object '{}' from bucket '{}'", 
params->object_key, *bucket);
     session.transfer(flow_file, Failure);
   }
 }
diff --git a/extensions/aws/processors/DeleteS3Object.h 
b/extensions/aws/processors/DeleteS3Object.h
index 94dbf0f4f..2d983d2a8 100644
--- a/extensions/aws/processors/DeleteS3Object.h
+++ b/extensions/aws/processors/DeleteS3Object.h
@@ -81,9 +81,10 @@ class DeleteS3Object : public S3Processor {
   }
 
   std::optional<aws::s3::DeleteObjectRequestParameters> 
buildDeleteS3RequestParams(
-    core::ProcessContext& context,
+    const core::ProcessContext& context,
     const core::FlowFile& flow_file,
-    const CommonProperties &common_properties) const;
+    const CommonProperties &common_properties,
+    std::string_view bucket) const;
 };
 
 }  // namespace org::apache::nifi::minifi::aws::processors
diff --git a/extensions/aws/processors/FetchS3Object.cpp 
b/extensions/aws/processors/FetchS3Object.cpp
index 194d68f54..9a7bb0ec4 100644
--- a/extensions/aws/processors/FetchS3Object.cpp
+++ b/extensions/aws/processors/FetchS3Object.cpp
@@ -1,6 +1,4 @@
 /**
- * @file FetchS3Object.cpp
- * FetchS3Object class implementation
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -43,12 +41,13 @@ void FetchS3Object::onSchedule(core::ProcessContext& 
context, core::ProcessSessi
 }
 
 std::optional<aws::s3::GetObjectRequestParameters> 
FetchS3Object::buildFetchS3RequestParams(
-    core::ProcessContext& context,
+    const core::ProcessContext& context,
     const core::FlowFile& flow_file,
-    const CommonProperties &common_properties) const {
+    const CommonProperties &common_properties,
+    const std::string_view bucket) const {
   gsl_Expects(client_config_);
   minifi::aws::s3::GetObjectRequestParameters 
get_object_params(common_properties.credentials, *client_config_);
-  get_object_params.bucket = common_properties.bucket;
+  get_object_params.bucket = bucket;
   get_object_params.requester_pays = requester_pays_;
 
   if (const auto object_key = context.getProperty(ObjectKey, &flow_file)) {
@@ -82,7 +81,15 @@ void FetchS3Object::onTrigger(core::ProcessContext& context, 
core::ProcessSessio
     return;
   }
 
-  auto get_object_params = buildFetchS3RequestParams(context, *flow_file, 
*common_properties);
+  auto bucket = context.getProperty(Bucket.name, flow_file.get());
+  if (!bucket) {
+    logger_->log_error("Bucket is invalid due to {}", 
bucket.error().message());
+    session.transfer(flow_file, Failure);
+    return;
+  }
+  logger_->log_debug("S3Processor: Bucket [{}]", *bucket);
+
+  auto get_object_params = buildFetchS3RequestParams(context, *flow_file, 
*common_properties, *bucket);
   if (!get_object_params) {
     session.transfer(flow_file, Failure);
     return;
diff --git a/extensions/aws/processors/FetchS3Object.h 
b/extensions/aws/processors/FetchS3Object.h
index 5505fff59..ea933b4e7 100644
--- a/extensions/aws/processors/FetchS3Object.h
+++ b/extensions/aws/processors/FetchS3Object.h
@@ -92,9 +92,10 @@ class FetchS3Object : public S3Processor {
   }
 
   std::optional<aws::s3::GetObjectRequestParameters> buildFetchS3RequestParams(
-    core::ProcessContext& context,
+    const core::ProcessContext& context,
     const core::FlowFile& flow_file,
-    const CommonProperties &common_properties) const;
+    const CommonProperties &common_properties,
+    std::string_view bucket) const;
 
   bool requester_pays_ = false;
 };
diff --git a/extensions/aws/processors/ListS3.cpp 
b/extensions/aws/processors/ListS3.cpp
index a3a482ae4..f11bca209 100644
--- a/extensions/aws/processors/ListS3.cpp
+++ b/extensions/aws/processors/ListS3.cpp
@@ -25,6 +25,7 @@
 #include "core/ProcessSession.h"
 #include "core/Resource.h"
 #include "utils/ProcessorConfigUtils.h"
+#include "utils/expected.h"
 
 namespace org::apache::nifi::minifi::aws::processors {
 
@@ -47,10 +48,13 @@ void ListS3::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFacto
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Required property is not set 
or invalid");
   }
 
+  auto bucket = context.getProperty(Bucket.name) | 
minifi::utils::orThrow("Required property");
+  logger_->log_debug("S3Processor: Bucket [{}]", bucket);
+
   gsl_Expects(client_config_);
   list_request_params_ = 
std::make_unique<aws::s3::ListRequestParameters>(common_properties->credentials,
 *client_config_);
   list_request_params_->setClientConfig(common_properties->proxy, 
common_properties->endpoint_override_url);
-  list_request_params_->bucket = common_properties->bucket;
+  list_request_params_->bucket = bucket;
 
   if (const auto delimiter = context.getProperty(Delimiter)) {
     list_request_params_->delimiter = *delimiter;
diff --git a/extensions/aws/processors/PutKinesisStream.cpp 
b/extensions/aws/processors/PutKinesisStream.cpp
new file mode 100644
index 000000000..e34b79ea5
--- /dev/null
+++ b/extensions/aws/processors/PutKinesisStream.cpp
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PutKinesisStream.h"
+
+#include <memory>
+#include <random>
+#include <ranges>
+#include <unordered_map>
+
+#include "aws/kinesis/KinesisClient.h"
+#include "aws/kinesis/model/PutRecordsRequest.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/Resource.h"
+#include "range/v3/algorithm/for_each.hpp"
+#include "range/v3/view.hpp"
+#include "utils/ProcessorConfigUtils.h"
+
+namespace org::apache::nifi::minifi::aws::processors {
+
+void PutKinesisStream::initialize() {
+  setSupportedProperties(Properties);
+  setSupportedRelationships(Relationships);
+}
+
+void PutKinesisStream::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory& session_factory) {
+  AwsProcessor::onSchedule(context, session_factory);
+
+  batch_size_ = parseU64Property(context, MessageBatchSize);
+  if (batch_size_ == 0 || batch_size_ > 500) {
+    logger_->log_warn("{} is invalid. Setting it to the maximum 500 value.", 
MessageBatchSize.name);
+    batch_size_ = 500;
+  }
+  batch_data_size_soft_cap_ = parseDataSizeProperty(context, MaxBatchDataSize);
+  if (batch_data_size_soft_cap_ > 4_MB) {
+    logger_->log_warn("{} is invalid. Setting it to the maximum 4 MB value.", 
MaxBatchDataSize.name);
+    batch_data_size_soft_cap_ = 4_MB;
+  }
+
+  endpoint_override_url_ = context.getProperty(EndpointOverrideURL.name) | 
minifi::utils::toOptional();
+}
+
+nonstd::expected<Aws::Kinesis::Model::PutRecordsRequestEntry, 
PutKinesisStream::BatchItemError> 
PutKinesisStream::createEntryFromFlowFile(const core::ProcessContext& context,
+    core::ProcessSession& session,
+    const std::shared_ptr<core::FlowFile>& flow_file) const {
+  Aws::Kinesis::Model::PutRecordsRequestEntry entry;
+  const auto partition_key = 
context.getProperty(AmazonKinesisStreamPartitionKey.name, flow_file.get()) | 
minifi::utils::valueOrElse([&flow_file] { return 
flow_file->getUUID().to_string(); });
+  entry.SetPartitionKey(partition_key);
+  const auto [status, buffer] = session.readBuffer(flow_file);
+  if (io::isError(status)) {
+    logger_->log_error("Couldn't read content from {}", 
flow_file->getUUIDStr());
+    return nonstd::make_unexpected(BatchItemError{.error_message = "Failed to 
read content", .error_code = std::nullopt});
+  }
+  Aws::Utils::ByteBuffer aws_buffer(reinterpret_cast<const unsigned 
char*>(buffer.data()), buffer.size());
+  entry.SetData(aws_buffer);
+  return entry;
+}
+
+std::unordered_map<std::string, PutKinesisStream::StreamBatch> 
PutKinesisStream::createStreamBatches(const core::ProcessContext& context, 
core::ProcessSession& session) const {
+  static constexpr uint64_t SINGLE_RECORD_MAX_SIZE = 1_MB;
+  std::unordered_map<std::string, StreamBatch> stream_batches;
+  uint64_t ff_count_in_batches = 0;
+  while (ff_count_in_batches < batch_size_) {
+    std::shared_ptr<core::FlowFile> flow_file = session.get();
+    if (!flow_file) { break; }
+    const auto flow_file_size = flow_file->getSize();
+    if (flow_file_size > SINGLE_RECORD_MAX_SIZE) {
+      flow_file->setAttribute(AwsKinesisErrorMessage.name, fmt::format("record 
too big {}, max allowed {}", flow_file_size, SINGLE_RECORD_MAX_SIZE));
+      session.transfer(flow_file, Failure);
+      logger_->log_error("Failed to publish to kinesis record {} because the 
size was greater than {} bytes", flow_file->getUUID().to_string(), 
SINGLE_RECORD_MAX_SIZE);
+      continue;
+    }
+
+    auto stream_name = context.getProperty(AmazonKinesisStreamName.name, 
flow_file.get());
+    if (!stream_name) {
+      logger_->log_error("Stream name is invalid due to {}", 
stream_name.error().message());
+      flow_file->setAttribute(AwsKinesisErrorMessage.name, fmt::format("Stream 
name is invalid due to {}", stream_name.error().message()));
+      session.transfer(flow_file, Failure);
+      continue;
+    }
+
+    auto entry = createEntryFromFlowFile(context, session, flow_file);
+    if (!entry) {
+      flow_file->addAttribute(AwsKinesisErrorMessage.name, 
entry.error().error_message);
+      if (entry.error().error_code) {
+        flow_file->addAttribute(AwsKinesisErrorCode.name, 
*entry.error().error_code);
+      }
+      session.transfer(flow_file, Failure);
+      continue;
+    }
+
+    auto [stream_batch, newly_created] = stream_batches.emplace(*stream_name, 
StreamBatch{});
+    if (newly_created) {
+      stream_batch->second.request.SetStreamName(*stream_name);
+    }
+    stream_batch->second.request.AddRecords(*entry);
+    stream_batch->second.items.push_back(BatchItem{.flow_file = 
std::move(flow_file), .result = BatchItemResult{}});
+    stream_batches[*stream_name].batch_size += flow_file_size;
+    ++ff_count_in_batches;
+
+    if (stream_batches[*stream_name].batch_size > batch_data_size_soft_cap_) {
+      break;
+    }
+  }
+  return stream_batches;
+}
+
+void PutKinesisStream::processBatch(StreamBatch& stream_batch, const 
Aws::Kinesis::KinesisClient& client) const {
+  const auto put_record_result = client.PutRecords(stream_batch.request);
+  if (!put_record_result.IsSuccess()) {
+    ranges::for_each(stream_batch.items, [&](auto& item) {
+      item.result = nonstd::make_unexpected(BatchItemError{
+        .error_message = put_record_result.GetError().GetMessage(),
+        .error_code = 
std::to_string(static_cast<int>(put_record_result.GetError().GetResponseCode()))});
+    });
+    return;
+  }
+
+  const auto result_records = put_record_result.GetResult().GetRecords();
+  if (result_records.size() != stream_batch.items.size()) {
+    logger_->log_critical("PutKinesisStream record size ({}) and result size 
({}) mismatch in {} cannot tell which record succeeded and which didnt",
+        stream_batch.items.size(), result_records.size(), 
stream_batch.request.GetStreamName());
+    ranges::for_each(stream_batch.items, [&](auto& item) {
+      item.result = nonstd::make_unexpected(BatchItemError{
+        .error_message = "Record size mismatch",
+        .error_code = std::nullopt});
+    });
+    return;
+  }
+
+  for (uint64_t i = 0; i < stream_batch.items.size(); i++) {
+    auto& [flow_file, result] = stream_batch.items[i];
+    const auto& result_record = result_records[i];
+    if (!result_record.GetErrorCode().empty()) {
+      result = nonstd::make_unexpected(BatchItemError{.error_message = 
result_record.GetErrorMessage(), .error_code = result_record.GetErrorCode()});
+    } else {
+      result = BatchItemResult{.sequence_number = 
result_record.GetSequenceNumber(), .shard_id = result_record.GetShardId()};
+    }
+  }
+}
+
+void PutKinesisStream::transferFlowFiles(core::ProcessSession& session, const 
StreamBatch& stream_batch) {
+  for (const auto& batch_item : stream_batch.items) {
+    if (batch_item.result) {
+      batch_item.flow_file->setAttribute(AwsKinesisSequenceNumber.name, 
batch_item.result->sequence_number);
+      batch_item.flow_file->setAttribute(AwsKinesisShardId.name, 
batch_item.result->shard_id);
+      session.transfer(batch_item.flow_file, Success);
+    } else {
+      batch_item.flow_file->setAttribute(AwsKinesisErrorMessage.name, 
batch_item.result.error().error_message);
+      if (batch_item.result.error().error_code) {
+        batch_item.flow_file->setAttribute(AwsKinesisErrorCode.name, 
*batch_item.result.error().error_code);
+      }
+      session.transfer(batch_item.flow_file, Failure);
+    }
+  }
+}
+
+
+void PutKinesisStream::onTrigger(core::ProcessContext& context, 
core::ProcessSession& session) {
+  logger_->log_trace("PutKinesisStream onTrigger");
+
+  const auto credentials = getAWSCredentials(context, nullptr);
+  if (!credentials) {
+    logger_->log_error("Failed to get credentials for PutKinesisStream");
+    context.yield();
+    return;
+  }
+
+  auto stream_batches = createStreamBatches(context, session);
+  if (stream_batches.empty()) {
+    context.yield();
+    return;
+  }
+  const auto kinesis_client = getClient(*credentials);
+
+  for (auto& stream_batch: stream_batches | std::views::values) {
+    processBatch(stream_batch, *kinesis_client);
+    transferFlowFiles(session, stream_batch);
+  }
+}
+
+std::unique_ptr<Aws::Kinesis::KinesisClient> PutKinesisStream::getClient(const 
Aws::Auth::AWSCredentials& credentials) {
+  gsl_Expects(client_config_);
+  auto client = std::make_unique<Aws::Kinesis::KinesisClient>(credentials, 
*client_config_);
+  if (endpoint_override_url_) {
+    client->OverrideEndpoint(*endpoint_override_url_);
+  }
+  return client;
+}
+
+REGISTER_RESOURCE(PutKinesisStream, Processor);
+
+}  // namespace org::apache::nifi::minifi::aws::processors
diff --git a/extensions/aws/processors/PutKinesisStream.h 
b/extensions/aws/processors/PutKinesisStream.h
new file mode 100644
index 000000000..c30483a20
--- /dev/null
+++ b/extensions/aws/processors/PutKinesisStream.h
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+
+#include <memory>
+#include <optional>
+#include <sstream>
+#include <string>
+#include <utility>
+
+#include "S3Processor.h"
+#include "aws/kinesis/KinesisClient.h"
+#include "aws/kinesis/model/PutRecordsRequest.h"
+#include "core/PropertyDefinitionBuilder.h"
+#include "utils/ArrayUtils.h"
+#include "utils/expected.h"
+
+namespace org::apache::nifi::minifi::aws::processors {
+
+class PutKinesisStream : public AwsProcessor {
+ public:
+  EXTENSIONAPI static constexpr const char* Description = "Sends the contents 
to a specified Amazon Kinesis. In order to send data to Kinesis, the stream 
name has to be specified.";
+
+  EXTENSIONAPI static constexpr auto AmazonKinesisStreamName = 
core::PropertyDefinitionBuilder<>::createProperty("Amazon Kinesis Stream Name")
+      .withDescription("The name of Kinesis Stream")
+      .isRequired(true)
+      .withValidator(core::StandardPropertyValidators::NON_BLANK_VALIDATOR)
+      .supportsExpressionLanguage(true)
+      .build();
+  EXTENSIONAPI static constexpr auto AmazonKinesisStreamPartitionKey = 
core::PropertyDefinitionBuilder<>::createProperty("Amazon Kinesis Stream 
Partition Key")
+      .withDescription("The partition key attribute. If it is not set, a 
random value is used")
+      .supportsExpressionLanguage(true)
+      .build();
+  EXTENSIONAPI static constexpr auto MessageBatchSize = 
core::PropertyDefinitionBuilder<>::createProperty("Batch Size")
+      .withDescription("Batch size for messages. [1-500]")
+      
.withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR)
+      .withDefaultValue("250")
+      .build();
+  EXTENSIONAPI static constexpr auto MaxBatchDataSize = 
core::PropertyDefinitionBuilder<>::createProperty("Max Batch Data Size")
+      .withDescription("Soft cap on the data size of the batch to a single 
stream. (max 4MB)")
+      .withValidator(core::StandardPropertyValidators::DATA_SIZE_VALIDATOR)
+      .withDefaultValue("1 MB")
+      .build();
+
+  EXTENSIONAPI static constexpr auto Properties = 
minifi::utils::array_cat(AwsProcessor::Properties, 
std::to_array<core::PropertyReference>({
+    AmazonKinesisStreamName, AmazonKinesisStreamPartitionKey, 
MessageBatchSize, MaxBatchDataSize
+  }));
+
+  EXTENSIONAPI static constexpr auto Success = 
core::RelationshipDefinition{"success", "FlowFiles are routed to success 
relationship"};
+  EXTENSIONAPI static constexpr auto Failure = 
core::RelationshipDefinition{"failure", "FlowFiles are routed to failure 
relationship"};
+  EXTENSIONAPI static constexpr auto Relationships = std::array{Success, 
Failure};
+
+  EXTENSIONAPI static constexpr auto AwsKinesisErrorMessage = 
core::OutputAttributeDefinition<>{"aws.kinesis.error.message", { Failure },
+    "Error message on posting message to AWS Kinesis"};
+  EXTENSIONAPI static constexpr auto AwsKinesisErrorCode = 
core::OutputAttributeDefinition<>{"aws.kinesis.error.code", { Failure },
+    "Error code for the message when posting to AWS Kinesis"};
+  EXTENSIONAPI static constexpr auto AwsKinesisSequenceNumber = 
core::OutputAttributeDefinition<>{"aws.kinesis.sequence.number", { Success },
+    "Sequence number for the message when posting to AWS Kinesis"};
+  EXTENSIONAPI static constexpr auto AwsKinesisShardId = 
core::OutputAttributeDefinition<>{"aws.kinesis.shard.id", { Success },
+    "Shard id of the message posted to AWS Kinesis"};
+  EXTENSIONAPI static constexpr auto OutputAttributes = 
std::to_array<core::OutputAttributeReference>({
+    AwsKinesisErrorMessage,
+    AwsKinesisErrorCode,
+    AwsKinesisSequenceNumber,
+    AwsKinesisShardId});
+
+  EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
+  EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
+  EXTENSIONAPI static constexpr auto InputRequirement = 
core::annotation::Input::INPUT_REQUIRED;
+  EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
+
+  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
+
+  explicit PutKinesisStream(const std::string& name, const 
minifi::utils::Identifier& uuid = minifi::utils::Identifier())
+    : AwsProcessor(name, uuid, 
core::logging::LoggerFactory<PutKinesisStream>::getLogger(uuid)) {
+  }
+  PutKinesisStream(const PutKinesisStream&) = delete;
+  PutKinesisStream(PutKinesisStream&&) = delete;
+  PutKinesisStream& operator=(const PutKinesisStream&) = delete;
+  PutKinesisStream& operator=(PutKinesisStream&&) = delete;
+  ~PutKinesisStream() override = default;
+
+  void initialize() override;
+  void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& 
session_factory) override;
+  void onTrigger(core::ProcessContext& context, core::ProcessSession& session) 
override;
+
+ protected:
+  virtual std::unique_ptr<Aws::Kinesis::KinesisClient> getClient(const 
Aws::Auth::AWSCredentials& credentials);
+
+ private:
+  struct BatchItemResult {
+    std::string sequence_number;
+    std::string shard_id;
+  };
+  struct BatchItemError {
+    std::string error_message;
+    std::optional<std::string> error_code;
+  };
+  struct BatchItem {
+    std::shared_ptr<core::FlowFile> flow_file;
+    nonstd::expected<BatchItemResult, BatchItemError> result;
+  };
+  struct StreamBatch {
+    uint64_t batch_size = 0;
+    std::vector<BatchItem> items;
+    Aws::Kinesis::Model::PutRecordsRequest request;
+  };
+
+  nonstd::expected<Aws::Kinesis::Model::PutRecordsRequestEntry, 
BatchItemError> createEntryFromFlowFile(const core::ProcessContext& context,
+      core::ProcessSession& session,
+      const std::shared_ptr<core::FlowFile>& flow_file) const;
+
+  std::unordered_map<std::string, StreamBatch> createStreamBatches(const 
core::ProcessContext& context, core::ProcessSession& session) const;
+  void processBatch(StreamBatch& stream_batch, const 
Aws::Kinesis::KinesisClient& client) const;
+  static void transferFlowFiles(core::ProcessSession& session, const 
StreamBatch& stream_batch);
+
+  uint64_t batch_size_ = 250;
+  uint64_t batch_data_size_soft_cap_ = 1_MB;
+  const utils::AWSInitializer& AWS_INITIALIZER = utils::AWSInitializer::get();
+  std::optional<std::string> endpoint_override_url_;
+};
+
+}  // namespace org::apache::nifi::minifi::aws::processors
diff --git a/extensions/aws/processors/PutS3Object.cpp 
b/extensions/aws/processors/PutS3Object.cpp
index b1d7b972c..ddb218f98 100644
--- a/extensions/aws/processors/PutS3Object.cpp
+++ b/extensions/aws/processors/PutS3Object.cpp
@@ -119,7 +119,7 @@ std::string PutS3Object::parseAccessControlList(const 
std::string &comma_separat
 }
 
 bool PutS3Object::setCannedAcl(
-    core::ProcessContext& context,
+    const core::ProcessContext& context,
     const core::FlowFile& flow_file,
     aws::s3::PutObjectRequestParameters &put_s3_request_params) const {
   if (const auto canned_acl = context.getProperty(CannedACL, &flow_file)) {
@@ -135,7 +135,7 @@ bool PutS3Object::setCannedAcl(
 }
 
 bool PutS3Object::setAccessControl(
-      core::ProcessContext& context,
+      const core::ProcessContext& context,
       const core::FlowFile& flow_file,
       aws::s3::PutObjectRequestParameters &put_s3_request_params) const {
   if (const auto full_control_user_list = 
context.getProperty(FullControlUserList, &flow_file)) {
@@ -159,13 +159,14 @@ bool PutS3Object::setAccessControl(
 }
 
 std::optional<aws::s3::PutObjectRequestParameters> 
PutS3Object::buildPutS3RequestParams(
-    core::ProcessContext& context,
+    const core::ProcessContext& context,
     const core::FlowFile& flow_file,
-    const CommonProperties &common_properties) const {
+    const CommonProperties &common_properties,
+    const std::string_view bucket) const {
   gsl_Expects(client_config_);
   aws::s3::PutObjectRequestParameters params(common_properties.credentials, 
*client_config_);
   params.setClientConfig(common_properties.proxy, 
common_properties.endpoint_override_url);
-  params.bucket = common_properties.bucket;
+  params.bucket = bucket;
   params.user_metadata_map = user_metadata_map_;
   params.server_side_encryption = server_side_encryption_;
   params.storage_class = storage_class_;
@@ -215,7 +216,7 @@ void PutS3Object::setAttributes(
   }
 }
 
-void PutS3Object::ageOffMultipartUploads(const CommonProperties 
&common_properties) {
+void PutS3Object::ageOffMultipartUploads(const CommonProperties 
&common_properties, const std::string_view bucket) {
   {
     std::lock_guard<std::mutex> lock(last_ageoff_mutex_);
     const auto now = std::chrono::system_clock::now();
@@ -229,7 +230,7 @@ void PutS3Object::ageOffMultipartUploads(const 
CommonProperties &common_properti
   logger_->log_trace("Listing aged off multipart uploads still in progress.");
   aws::s3::ListMultipartUploadsRequestParameters 
list_params(common_properties.credentials, *client_config_);
   list_params.setClientConfig(common_properties.proxy, 
common_properties.endpoint_override_url);
-  list_params.bucket = common_properties.bucket;
+  list_params.bucket = bucket;
   list_params.age_off_limit = multipart_upload_max_age_threshold_;
   list_params.use_virtual_addressing = use_virtual_addressing_;
   auto aged_off_uploads_in_progress = 
s3_wrapper_.listMultipartUploads(list_params);
@@ -238,14 +239,14 @@ void PutS3Object::ageOffMultipartUploads(const 
CommonProperties &common_properti
     return;
   }
 
-  logger_->log_info("Found {} aged off pending multipart upload jobs in bucket 
'{}'", aged_off_uploads_in_progress->size(), common_properties.bucket);
+  logger_->log_info("Found {} aged off pending multipart upload jobs in bucket 
'{}'", aged_off_uploads_in_progress->size(), bucket);
   size_t aborted = 0;
   for (const auto& upload : *aged_off_uploads_in_progress) {
     logger_->log_info("Aborting multipart upload with key '{}' and upload id 
'{}' in bucket '{}' due to reaching maximum upload age threshold.",
-      upload.key, upload.upload_id, common_properties.bucket);
+      upload.key, upload.upload_id, bucket);
     aws::s3::AbortMultipartUploadRequestParameters 
abort_params(common_properties.credentials, *client_config_);
     abort_params.setClientConfig(common_properties.proxy, 
common_properties.endpoint_override_url);
-    abort_params.bucket = common_properties.bucket;
+    abort_params.bucket = bucket;
     abort_params.key = upload.key;
     abort_params.upload_id = upload.upload_id;
     abort_params.use_virtual_addressing = use_virtual_addressing_;
@@ -256,7 +257,7 @@ void PutS3Object::ageOffMultipartUploads(const 
CommonProperties &common_properti
     ++aborted;
   }
   if (aborted > 0) {
-    logger_->log_info("Aborted {} pending multipart upload jobs in bucket 
'{}'", aborted, common_properties.bucket);
+    logger_->log_info("Aborted {} pending multipart upload jobs in bucket 
'{}'", aborted, bucket);
   }
   
s3_wrapper_.ageOffLocalS3MultipartUploadStates(multipart_upload_max_age_threshold_);
 }
@@ -275,9 +276,17 @@ void PutS3Object::onTrigger(core::ProcessContext& context, 
core::ProcessSession&
     return;
   }
 
-  ageOffMultipartUploads(*common_properties);
+  auto bucket = context.getProperty(Bucket.name, flow_file.get());
+  if (!bucket) {
+    logger_->log_error("Bucket is invalid due to {}", 
bucket.error().message());
+    session.transfer(flow_file, Failure);
+    return;
+  }
+  logger_->log_debug("S3Processor: Bucket [{}]", *bucket);
+
+  ageOffMultipartUploads(*common_properties, *bucket);
 
-  auto put_s3_request_params = buildPutS3RequestParams(context, *flow_file, 
*common_properties);
+  auto put_s3_request_params = buildPutS3RequestParams(context, *flow_file, 
*common_properties, *bucket);
   if (!put_s3_request_params) {
     session.transfer(flow_file, Failure);
     return;
diff --git a/extensions/aws/processors/PutS3Object.h 
b/extensions/aws/processors/PutS3Object.h
index 6e2ca0b47..1af4d1cfd 100644
--- a/extensions/aws/processors/PutS3Object.h
+++ b/extensions/aws/processors/PutS3Object.h
@@ -210,18 +210,19 @@ class PutS3Object : public S3Processor {
 
   void fillUserMetadata(core::ProcessContext& context);
   static std::string parseAccessControlList(const std::string 
&comma_separated_list);
-  bool setCannedAcl(core::ProcessContext& context, const core::FlowFile& 
flow_file, aws::s3::PutObjectRequestParameters &put_s3_request_params) const;
-  bool setAccessControl(core::ProcessContext& context, const core::FlowFile& 
flow_file, aws::s3::PutObjectRequestParameters &put_s3_request_params) const;
+  bool setCannedAcl(const core::ProcessContext& context, const core::FlowFile& 
flow_file, aws::s3::PutObjectRequestParameters &put_s3_request_params) const;
+  bool setAccessControl(const core::ProcessContext& context, const 
core::FlowFile& flow_file, aws::s3::PutObjectRequestParameters 
&put_s3_request_params) const;
   void setAttributes(
     core::ProcessSession& session,
     core::FlowFile& flow_file,
     const aws::s3::PutObjectRequestParameters &put_s3_request_params,
     const minifi::aws::s3::PutObjectResult &put_object_result) const;
   std::optional<aws::s3::PutObjectRequestParameters> buildPutS3RequestParams(
-    core::ProcessContext& context,
+    const core::ProcessContext& context,
     const core::FlowFile& flow_file,
-    const CommonProperties &common_properties) const;
-  void ageOffMultipartUploads(const CommonProperties &common_properties);
+    const CommonProperties &common_properties,
+    std::string_view bucket) const;
+  void ageOffMultipartUploads(const CommonProperties &common_properties, const 
std::string_view bucket);
 
   std::string user_metadata_;
   std::map<std::string, std::string> user_metadata_map_;
diff --git a/extensions/aws/processors/S3Processor.cpp 
b/extensions/aws/processors/S3Processor.cpp
index 50351cec1..6be93f626 100644
--- a/extensions/aws/processors/S3Processor.cpp
+++ b/extensions/aws/processors/S3Processor.cpp
@@ -32,120 +32,19 @@
 
 namespace org::apache::nifi::minifi::aws::processors {
 
-S3Processor::S3Processor(std::string_view name, const 
minifi::utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger)
-  : core::ProcessorImpl(name, uuid),
-    logger_(std::move(logger)) {
-}
+S3Processor::S3Processor(const std::string_view name, const 
minifi::utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger)
+  : AwsProcessor(name, uuid, std::move(logger)) {}
 
-S3Processor::S3Processor(std::string_view name, const 
minifi::utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger, 
std::unique_ptr<aws::s3::S3RequestSender> s3_request_sender)
-  : core::ProcessorImpl(name, uuid),
-    logger_(std::move(logger)),
+S3Processor::S3Processor(const std::string_view name, const 
minifi::utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger, 
std::unique_ptr<aws::s3::S3RequestSender> s3_request_sender)
+  : AwsProcessor(name, uuid, std::move(logger)),
     s3_wrapper_(std::move(s3_request_sender)) {
 }
 
-std::optional<Aws::Auth::AWSCredentials> 
S3Processor::getAWSCredentialsFromControllerService(core::ProcessContext& 
context) const {
-  if (const auto aws_credentials_service = 
minifi::utils::parseOptionalControllerService<controllers::AWSCredentialsService>(context,
 AWSCredentialsProviderService, getUUID())) {
-    return (*aws_credentials_service)->getAWSCredentials();
-  }
-  logger_->log_error("AWS credentials service could not be found");
-
-  return std::nullopt;
-}
-
-std::optional<Aws::Auth::AWSCredentials> S3Processor::getAWSCredentials(
-    core::ProcessContext& context,
-    const core::FlowFile* const flow_file) {
-  auto service_cred = getAWSCredentialsFromControllerService(context);
-  if (service_cred) {
-    logger_->log_info("AWS Credentials successfully set from controller 
service");
-    return service_cred.value();
-  }
-
-  aws::AWSCredentialsProvider aws_credentials_provider;
-  if (const auto access_key = context.getProperty(AccessKey.name, flow_file)) {
-    aws_credentials_provider.setAccessKey(*access_key);
-  }
-  if (const auto secret_key = context.getProperty(SecretKey.name, flow_file)) {
-    aws_credentials_provider.setSecretKey(*secret_key);
-  }
-  if (const auto credentials_file = context.getProperty(CredentialsFile.name, 
flow_file)) {
-    aws_credentials_provider.setCredentialsFile(*credentials_file);
-  }
-  if (const auto use_credentials = 
context.getProperty(UseDefaultCredentials.name, flow_file) | 
minifi::utils::andThen(parsing::parseBool)) {
-    aws_credentials_provider.setUseDefaultCredentials(*use_credentials);
-  }
-
-  return aws_credentials_provider.getAWSCredentials();
-}
-
-std::optional<aws::s3::ProxyOptions> 
S3Processor::getProxy(core::ProcessContext& context, const core::FlowFile* 
const flow_file) {
-  aws::s3::ProxyOptions proxy;
-
-  proxy.host = minifi::utils::parseOptionalProperty(context, ProxyHost, 
flow_file).value_or("");
-  proxy.port = 
gsl::narrow<uint32_t>(minifi::utils::parseOptionalU64Property(context, 
ProxyPort, flow_file).value_or(0));
-  proxy.username = minifi::utils::parseOptionalProperty(context, 
ProxyUsername, flow_file).value_or("");
-  proxy.password = minifi::utils::parseOptionalProperty(context, 
ProxyPassword, flow_file).value_or("");
-
-  if (!proxy.host.empty()) {
-    logger_->log_info("Proxy for S3Processor was set.");
-  }
-  return proxy;
-}
-
-void S3Processor::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
-  client_config_ = Aws::Client::ClientConfiguration();
+void S3Processor::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory& session_factory) {
+  AwsProcessor::onSchedule(context, session_factory);
   if (!getProperty(Bucket.name)) {
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Bucket property missing or 
invalid");
   }
-
-  client_config_->region = context.getProperty(Region) | 
minifi::utils::orThrow("Region property missing or invalid");
-  logger_->log_debug("S3Processor: Region [{}]", client_config_->region);
-
-  if (auto communications_timeout = 
minifi::utils::parseOptionalDurationProperty(context, CommunicationsTimeout)) {
-    logger_->log_debug("S3Processor: Communications Timeout {}", 
*communications_timeout);
-    client_config_->connectTimeoutMs = 
gsl::narrow<long>(communications_timeout->count());  // 
NOLINT(runtime/int,google-runtime-int)
-  } else {
-    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Communications Timeout 
missing or invalid");
-  }
-
-  static const auto default_ca_file = minifi::utils::getDefaultCAFile();
-  if (default_ca_file) {
-    client_config_->caFile = *default_ca_file;
-  }
-}
-
-std::optional<CommonProperties> S3Processor::getCommonELSupportedProperties(
-    core::ProcessContext& context,
-    const core::FlowFile* const flow_file) {
-  CommonProperties properties;
-  if (auto bucket = context.getProperty(Bucket, flow_file); !bucket || 
bucket->empty()) {
-    logger_->log_error("Bucket '{}' is invalid or empty!", properties.bucket);
-    return std::nullopt;
-  } else {
-    properties.bucket = *bucket;
-  }
-  logger_->log_debug("S3Processor: Bucket [{}]", properties.bucket);
-
-  auto credentials = getAWSCredentials(context, flow_file);
-  if (!credentials) {
-    logger_->log_error("AWS Credentials have not been set!");
-    return std::nullopt;
-  }
-  properties.credentials = credentials.value();
-
-  auto proxy = getProxy(context, flow_file);
-  if (!proxy) {
-    return std::nullopt;
-  }
-  properties.proxy = proxy.value();
-
-  const auto endpoint_override_url = context.getProperty(EndpointOverrideURL, 
flow_file);
-  if (endpoint_override_url) {
-    properties.endpoint_override_url = *endpoint_override_url;
-    logger_->log_debug("S3Processor: Endpoint Override URL [{}]", 
properties.endpoint_override_url);
-  }
-
-  return properties;
 }
 
 }  // namespace org::apache::nifi::minifi::aws::processors
diff --git a/extensions/aws/processors/S3Processor.h 
b/extensions/aws/processors/S3Processor.h
index fe628e27c..41beec73d 100644
--- a/extensions/aws/processors/S3Processor.h
+++ b/extensions/aws/processors/S3Processor.h
@@ -1,6 +1,4 @@
 /**
- * @file S3Processor.h
- * Base S3 processor class declaration
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -28,80 +26,21 @@
 #include <string_view>
 #include <utility>
 
-#include "aws/core/auth/AWSCredentialsProvider.h"
-#include "S3Wrapper.h"
 #include "AWSCredentialsProvider.h"
+#include "AwsProcessor.h"
+#include "S3Wrapper.h"
+#include "aws/core/auth/AWSCredentialsProvider.h"
+#include "core/Processor.h"
 #include "core/Property.h"
 #include "core/PropertyDefinition.h"
 #include "core/PropertyDefinitionBuilder.h"
-#include "minifi-cpp/core/PropertyValidator.h"
-#include "core/Processor.h"
 #include "core/logging/Logger.h"
 #include "core/logging/LoggerFactory.h"
 #include "utils/OptionalUtils.h"
 
 namespace org::apache::nifi::minifi::aws::processors {
 
-namespace region {
-inline constexpr std::string_view AF_SOUTH_1 = "af-south-1";
-inline constexpr std::string_view AP_EAST_1 = "ap-east-1";
-inline constexpr std::string_view AP_NORTHEAST_1 = "ap-northeast-1";
-inline constexpr std::string_view AP_NORTHEAST_2 = "ap-northeast-2";
-inline constexpr std::string_view AP_NORTHEAST_3 = "ap-northeast-3";
-inline constexpr std::string_view AP_SOUTH_1 = "ap-south-1";
-inline constexpr std::string_view AP_SOUTH_2 = "ap-south-2";
-inline constexpr std::string_view AP_SOUTHEAST_1 = "ap-southeast-1";
-inline constexpr std::string_view AP_SOUTHEAST_2 = "ap-southeast-2";
-inline constexpr std::string_view AP_SOUTHEAST_3 = "ap-southeast-3";
-inline constexpr std::string_view AP_SOUTHEAST_4 = "ap-southeast-4";
-inline constexpr std::string_view AP_SOUTHEAST_5 = "ap-southeast-5";
-inline constexpr std::string_view AP_SOUTHEAST_7 = "ap-southeast-7";
-inline constexpr std::string_view CA_CENTRAL_1 = "ca-central-1";
-inline constexpr std::string_view CA_WEST_1 = "ca-west-1";
-inline constexpr std::string_view CN_NORTH_1 = "cn-north-1";
-inline constexpr std::string_view CN_NORTHWEST_1 = "cn-northwest-1";
-inline constexpr std::string_view EU_CENTRAL_1 = "eu-central-1";
-inline constexpr std::string_view EU_CENTRAL_2 = "eu-central-2";
-inline constexpr std::string_view EU_ISOE_WEST_1 = "eu-isoe-west-1";
-inline constexpr std::string_view EU_NORTH_1 = "eu-north-1";
-inline constexpr std::string_view EU_SOUTH_1 = "eu-south-1";
-inline constexpr std::string_view EU_SOUTH_2 = "eu-south-2";
-inline constexpr std::string_view EU_WEST_1 = "eu-west-1";
-inline constexpr std::string_view EU_WEST_2 = "eu-west-2";
-inline constexpr std::string_view EU_WEST_3 = "eu-west-3";
-inline constexpr std::string_view IL_CENTRAL_1 = "il-central-1";
-inline constexpr std::string_view ME_CENTRAL_1 = "me-central-1";
-inline constexpr std::string_view ME_SOUTH_1 = "me-south-1";
-inline constexpr std::string_view MX_CENTRAL_1 = "mx-central-1";
-inline constexpr std::string_view SA_EAST_1 = "sa-east-1";
-inline constexpr std::string_view US_EAST_1 = "us-east-1";
-inline constexpr std::string_view US_EAST_2 = "us-east-2";
-inline constexpr std::string_view US_GOV_EAST_1 = "us-gov-east-1";
-inline constexpr std::string_view US_GOV_WEST_1 = "us-gov-west-1";
-inline constexpr std::string_view US_ISO_EAST_1 = "us-iso-east-1";
-inline constexpr std::string_view US_ISO_WEST_1 = "us-iso-west-1";
-inline constexpr std::string_view US_ISOB_EAST_1 = "us-isob-east-1";
-inline constexpr std::string_view US_ISOF_EAST_1 = "us-isof-east-1";
-inline constexpr std::string_view US_ISOF_SOUTH_1 = "us-isof-south-1";
-inline constexpr std::string_view US_WEST_1 = "us-west-1";
-inline constexpr std::string_view US_WEST_2 = "us-west-2";
-
-inline constexpr auto REGIONS = std::array{
-  AF_SOUTH_1, AP_EAST_1, AP_NORTHEAST_1, AP_NORTHEAST_2, AP_NORTHEAST_3, 
AP_SOUTH_1, AP_SOUTH_2, AP_SOUTHEAST_1, AP_SOUTHEAST_2, AP_SOUTHEAST_3, 
AP_SOUTHEAST_4, AP_SOUTHEAST_5, AP_SOUTHEAST_7,
-  CA_CENTRAL_1, CA_WEST_1, CN_NORTH_1, CN_NORTHWEST_1, EU_CENTRAL_1, 
EU_CENTRAL_2, EU_ISOE_WEST_1, EU_NORTH_1, EU_SOUTH_1, EU_SOUTH_2, EU_WEST_1, 
EU_WEST_2, EU_WEST_3, IL_CENTRAL_1, ME_CENTRAL_1,
-  ME_SOUTH_1, MX_CENTRAL_1, SA_EAST_1, US_EAST_1, US_EAST_2, US_GOV_EAST_1, 
US_GOV_WEST_1, US_ISO_EAST_1, US_ISO_WEST_1, US_ISOB_EAST_1, US_ISOF_EAST_1, 
US_ISOF_SOUTH_1, US_WEST_1, US_WEST_2
-};
-}  // namespace region
-
-struct CommonProperties {
-  std::string bucket;
-  std::string object_key;
-  Aws::Auth::AWSCredentials credentials;
-  aws::s3::ProxyOptions proxy;
-  std::string endpoint_override_url;
-};
-
-class S3Processor : public core::ProcessorImpl {
+class S3Processor : public AwsProcessor {
  public:
   EXTENSIONAPI static constexpr auto Bucket = 
core::PropertyDefinitionBuilder<>::createProperty("Bucket")
       .withDescription("The S3 bucket")
@@ -109,96 +48,16 @@ class S3Processor : public core::ProcessorImpl {
       .withValidator(core::StandardPropertyValidators::NON_BLANK_VALIDATOR)
       .supportsExpressionLanguage(true)
       .build();
-  EXTENSIONAPI static constexpr auto AccessKey = 
core::PropertyDefinitionBuilder<>::createProperty("Access Key")
-      .withDescription("AWS account access key")
-      .supportsExpressionLanguage(true)
-      .build();
-  EXTENSIONAPI static constexpr auto SecretKey = 
core::PropertyDefinitionBuilder<>::createProperty("Secret Key")
-      .withDescription("AWS account secret key")
-      .supportsExpressionLanguage(true)
-      .isSensitive(true)
-      .build();
-  EXTENSIONAPI static constexpr auto CredentialsFile = 
core::PropertyDefinitionBuilder<>::createProperty("Credentials File")
-      .withDescription("Path to a file containing AWS access key and secret 
key in properties file format. Properties used: accessKey and secretKey")
-      .build();
-  EXTENSIONAPI static constexpr auto AWSCredentialsProviderService = 
core::PropertyDefinitionBuilder<>::createProperty("AWS Credentials Provider 
service")
-      .withDescription("The name of the AWS Credentials Provider controller 
service that is used to obtain AWS credentials.")
-      .build();
-  EXTENSIONAPI static constexpr auto Region = 
core::PropertyDefinitionBuilder<region::REGIONS.size()>::createProperty("Region")
-      .isRequired(true)
-      .withDefaultValue(region::US_WEST_2)
-      .withAllowedValues(region::REGIONS)
-      .withDescription("AWS Region")
-      .build();
-  EXTENSIONAPI static constexpr auto CommunicationsTimeout = 
core::PropertyDefinitionBuilder<>::createProperty("Communications Timeout")
-      .isRequired(true)
-      .withValidator(core::StandardPropertyValidators::TIME_PERIOD_VALIDATOR)
-      .withDefaultValue("30 sec")
-      .withDescription("Sets the timeout of the communication between the AWS 
server and the client")
-      .build();
-  EXTENSIONAPI static constexpr auto EndpointOverrideURL = 
core::PropertyDefinitionBuilder<>::createProperty("Endpoint Override URL")
-      .withDescription("Endpoint URL to use instead of the AWS default 
including scheme, host, "
-          "port, and path. The AWS libraries select an endpoint URL based on 
the AWS "
-          "region, but this property overrides the selected endpoint URL, 
allowing use "
-          "with other S3-compatible endpoints.")
-      .supportsExpressionLanguage(true)
-      .withValidator(core::StandardPropertyValidators::NON_BLANK_VALIDATOR)
-      .build();
-  EXTENSIONAPI static constexpr auto ProxyHost = 
core::PropertyDefinitionBuilder<>::createProperty("Proxy Host")
-      .withDescription("Proxy host name or IP")
-      .supportsExpressionLanguage(true)
-      .build();
-  EXTENSIONAPI static constexpr auto ProxyPort = 
core::PropertyDefinitionBuilder<>::createProperty("Proxy Port")
-      .withDescription("The port number of the proxy host")
-      .supportsExpressionLanguage(true)
-      .build();
-  EXTENSIONAPI static constexpr auto ProxyUsername = 
core::PropertyDefinitionBuilder<>::createProperty("Proxy Username")
-      .withDescription("Username to set when authenticating against proxy")
-      .supportsExpressionLanguage(true)
-      .build();
-  EXTENSIONAPI static constexpr auto ProxyPassword = 
core::PropertyDefinitionBuilder<>::createProperty("Proxy Password")
-      .withDescription("Password to set when authenticating against proxy")
-      .supportsExpressionLanguage(true)
-      .isSensitive(true)
-      .build();
-  EXTENSIONAPI static constexpr auto UseDefaultCredentials = 
core::PropertyDefinitionBuilder<>::createProperty("Use Default Credentials")
-      .withDescription("If true, uses the Default Credential chain, including 
EC2 instance profiles or roles, environment variables, default user 
credentials, etc.")
-      .withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR)
-      .withDefaultValue("false")
-      .isRequired(true)
-      .build();
-  EXTENSIONAPI static constexpr auto Properties = 
std::to_array<core::PropertyReference>({
-      Bucket,
-      AccessKey,
-      SecretKey,
-      CredentialsFile,
-      AWSCredentialsProviderService,
-      Region,
-      CommunicationsTimeout,
-      EndpointOverrideURL,
-      ProxyHost,
-      ProxyPort,
-      ProxyUsername,
-      ProxyPassword,
-      UseDefaultCredentials
-  });
 
+  EXTENSIONAPI static constexpr auto Properties = 
minifi::utils::array_cat(AwsProcessor::Properties, 
std::to_array<core::PropertyReference>({Bucket}));
 
-  explicit S3Processor(std::string_view name, const minifi::utils::Identifier& 
uuid, std::shared_ptr<core::logging::Logger> logger);
 
+  explicit S3Processor(std::string_view name, const minifi::utils::Identifier& 
uuid, std::shared_ptr<core::logging::Logger> logger);
   void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& 
session_factory) override;
 
  protected:
   explicit S3Processor(std::string_view name, const minifi::utils::Identifier& 
uuid, std::shared_ptr<core::logging::Logger> logger, 
std::unique_ptr<aws::s3::S3RequestSender> s3_request_sender);
-
-  std::optional<Aws::Auth::AWSCredentials> 
getAWSCredentialsFromControllerService(core::ProcessContext& context) const;
-  std::optional<Aws::Auth::AWSCredentials> 
getAWSCredentials(core::ProcessContext& context, const core::FlowFile* const 
flow_file);
-  std::optional<aws::s3::ProxyOptions> getProxy(core::ProcessContext& context, 
const core::FlowFile* const flow_file);
-  std::optional<CommonProperties> 
getCommonELSupportedProperties(core::ProcessContext& context, const 
core::FlowFile* const flow_file);
-
-  std::shared_ptr<core::logging::Logger> logger_;
   aws::s3::S3Wrapper s3_wrapper_;
-  std::optional<Aws::Client::ClientConfiguration> client_config_;
 };
 
 }  // namespace org::apache::nifi::minifi::aws::processors
diff --git a/extensions/aws/s3/S3ClientRequestSender.cpp 
b/extensions/aws/s3/S3ClientRequestSender.cpp
index dd6c3025b..6557288a7 100644
--- a/extensions/aws/s3/S3ClientRequestSender.cpp
+++ b/extensions/aws/s3/S3ClientRequestSender.cpp
@@ -1,6 +1,4 @@
 /**
- * @file S3ClientRequestSender.cpp
- * S3ClientRequestSender class implementation
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
diff --git a/extensions/aws/s3/S3ClientRequestSender.h 
b/extensions/aws/s3/S3ClientRequestSender.h
index 86bfc96e1..7da5fa2fc 100644
--- a/extensions/aws/s3/S3ClientRequestSender.h
+++ b/extensions/aws/s3/S3ClientRequestSender.h
@@ -1,6 +1,4 @@
 /**
- * @file S3Wrapper.h
- * S3Wrapper class declaration
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
diff --git a/extensions/aws/s3/S3RequestSender.h 
b/extensions/aws/s3/S3RequestSender.h
index a2cd2078c..0c92ad117 100644
--- a/extensions/aws/s3/S3RequestSender.h
+++ b/extensions/aws/s3/S3RequestSender.h
@@ -1,6 +1,4 @@
 /**
- * @file S3RequestSender.h
- * S3RequestSender class declaration
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -55,13 +53,6 @@
 
 namespace org::apache::nifi::minifi::aws::s3 {
 
-struct ProxyOptions {
-  std::string host;
-  uint32_t port = 0;
-  std::string username;
-  std::string password;
-};
-
 class S3RequestSender {
  public:
   virtual std::optional<Aws::S3::Model::PutObjectResult> sendPutObjectRequest(
diff --git a/extensions/aws/s3/S3Wrapper.cpp b/extensions/aws/s3/S3Wrapper.cpp
index aff06b72f..70972bbea 100644
--- a/extensions/aws/s3/S3Wrapper.cpp
+++ b/extensions/aws/s3/S3Wrapper.cpp
@@ -1,6 +1,4 @@
 /**
- * @file S3Wrapper.cpp
- * S3Wrapper class implementation
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
diff --git a/extensions/aws/s3/S3Wrapper.h b/extensions/aws/s3/S3Wrapper.h
index d943076ba..76756da09 100644
--- a/extensions/aws/s3/S3Wrapper.h
+++ b/extensions/aws/s3/S3Wrapper.h
@@ -1,6 +1,4 @@
 /**
- * @file S3Wrapper.h
- * S3Wrapper class declaration
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -29,26 +27,26 @@
 #include <utility>
 #include <vector>
 
-#include "aws/s3/model/StorageClass.h"
-#include "aws/s3/model/ServerSideEncryption.h"
+#include "Exception.h"
+#include "MultipartUploadStateStorage.h"
+#include "S3RequestSender.h"
 #include "aws/s3/model/ObjectCannedACL.h"
+#include "aws/s3/model/ServerSideEncryption.h"
+#include "aws/s3/model/StorageClass.h"
 #include "aws/s3/model/ChecksumAlgorithm.h"
 
 #include "core/logging/Logger.h"
 #include "core/logging/LoggerFactory.h"
+#include "io/InputStream.h"
+#include "io/OutputStream.h"
+#include "range/v3/algorithm/find.hpp"
 #include "utils/AWSInitializer.h"
 #include "utils/ConfigurationUtils.h"
+#include "utils/ListingStateManager.h"
 #include "utils/OptionalUtils.h"
 #include "utils/StringUtils.h"
-#include "utils/ListingStateManager.h"
 #include "utils/gsl.h"
-#include "S3RequestSender.h"
-#include "Exception.h"
-#include "MultipartUploadStateStorage.h"
-#include "range/v3/algorithm/find.hpp"
-#include "utils/Literals.h"
-#include "io/InputStream.h"
-#include "io/OutputStream.h"
+#include "utils/ProxyOptions.h"
 
 namespace org::apache::nifi::minifi::aws::s3 {
 
@@ -128,7 +126,7 @@ struct RequestParameters {
   Aws::Auth::AWSCredentials credentials;
   Aws::Client::ClientConfiguration client_config;
 
-  void setClientConfig(const aws::s3::ProxyOptions& proxy, const std::string& 
endpoint_override_url) {
+  void setClientConfig(const aws::ProxyOptions& proxy, const std::string& 
endpoint_override_url) {
     client_config.proxyHost = proxy.host;
     client_config.proxyPort = proxy.port;
     client_config.proxyUserName = proxy.username;
diff --git a/extensions/aws/tests/PutKinesisStreamTests.cpp 
b/extensions/aws/tests/PutKinesisStreamTests.cpp
new file mode 100644
index 000000000..bbfda2b9c
--- /dev/null
+++ b/extensions/aws/tests/PutKinesisStreamTests.cpp
@@ -0,0 +1,318 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <minifi-cpp/core/FlowFile.h>
+
+#include "aws/kinesis/model/PutRecordsRequest.h"
+#include "core/Resource.h"
+#include "processors/PutKinesisStream.h"
+#include "unit/Catch.h"
+#include "unit/SingleProcessorTestController.h"
+#include "unit/TestBase.h"
+
+namespace org::apache::nifi::minifi::aws::processors::test {
+
+class MockKinesisClient final : public Aws::Kinesis::KinesisClient {
+ public:
+  enum class KinesisBehaviour {
+    HappyPath,
+    Failure,
+    RecordSizeMismatch,
+    OddsFail,
+  };
+
+  Aws::Kinesis::Model::PutRecordsOutcome PutRecords(const 
Aws::Kinesis::Model::PutRecordsRequest& request) const override {
+    switch (behaviour_) {
+      case KinesisBehaviour::HappyPath: {
+        Aws::Kinesis::Model::PutRecordsResult result;
+        for ([[maybe_unused]] const auto& request_entry : 
request.GetRecords()) {
+          Aws::Kinesis::Model::PutRecordsResultEntry result_entry;
+          result_entry.SetSequenceNumber(fmt::format("sequence_number_{}", 
++sequence_number_));
+          result_entry.SetShardId("shard_id");
+          result.AddRecords(result_entry);
+        }
+        return result;
+      }
+      case KinesisBehaviour::Failure: {
+        auto err = Aws::Kinesis::KinesisError();
+        err.SetResponseCode(Aws::Http::HttpResponseCode::UNAUTHORIZED);
+        err.SetMessage("Unauthorized");
+        return err;
+      }
+      case KinesisBehaviour::RecordSizeMismatch: {
+        Aws::Kinesis::Model::PutRecordsResult result;
+        return result;
+      }
+      case KinesisBehaviour::OddsFail: {
+        Aws::Kinesis::Model::PutRecordsResult result;
+        uint8_t i = 0;
+        for ([[maybe_unused]] const auto& request_entry : 
request.GetRecords()) {
+          Aws::Kinesis::Model::PutRecordsResultEntry result_entry;
+          if (++i%2 == 0) {
+            result_entry.SetErrorCode("8");
+            result_entry.SetErrorMessage("Some error message");
+          } else {
+            result_entry.SetSequenceNumber(fmt::format("sequence_number_{}", 
++sequence_number_));
+            result_entry.SetShardId("shard_id");
+          }
+          result.AddRecords(result_entry);
+        }
+        return result;
+      }
+      default: { throw std::invalid_argument("Unknown behaviour"); }
+    }
+  }
+
+  KinesisBehaviour behaviour_ = KinesisBehaviour::HappyPath;
+  mutable uint32_t sequence_number_ = 0;
+};
+
+class PutKinesisStreamMocked final : public aws::processors::PutKinesisStream {
+ public:
+  static constexpr const char* Description = "PutKinesisStreamMocked";
+
+  explicit PutKinesisStreamMocked(const std::string& name, const 
minifi::utils::Identifier& uuid = minifi::utils::Identifier())
+  : PutKinesisStream(name, uuid) {
+  }
+
+  PutKinesisStreamMocked(const PutKinesisStreamMocked&) = delete;
+  PutKinesisStreamMocked(PutKinesisStreamMocked&&) = delete;
+  PutKinesisStreamMocked& operator=(const PutKinesisStreamMocked&) = delete;
+  PutKinesisStreamMocked& operator=(PutKinesisStreamMocked&&) = delete;
+
+  ~PutKinesisStreamMocked() override = default;
+
+  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
+
+  std::unique_ptr<Aws::Kinesis::KinesisClient> getClient(const 
Aws::Auth::AWSCredentials&) override {
+    auto client = std::make_unique<MockKinesisClient>();
+    client->behaviour_ = behaviour_;
+    return client;
+  }
+
+  MockKinesisClient::KinesisBehaviour behaviour_ = 
MockKinesisClient::KinesisBehaviour::HappyPath;
+};
+REGISTER_RESOURCE(PutKinesisStreamMocked, Processor);
+
+TEST_CASE("PutKinesisStream record size mismatch path") {
+  minifi::test::SingleProcessorTestController 
controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream"));
+  auto put_kinesis_stream = 
dynamic_cast<PutKinesisStreamMocked*>(controller.getProcessor());
+  REQUIRE(put_kinesis_stream);
+  put_kinesis_stream->behaviour_ = 
MockKinesisClient::KinesisBehaviour::RecordSizeMismatch;
+
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AccessKey, "access_key");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::SecretKey, "secret_key");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AmazonKinesisStreamName, "stream_name");
+
+
+  const auto result = controller.trigger({{.content = "foo"}, {.content = 
"bar"}});
+  CHECK(result.at(PutKinesisStream::Success).empty());
+  REQUIRE(result.at(PutKinesisStream::Failure).size() == 2);
+  const auto res_ff_1 = result.at(PutKinesisStream::Failure).at(0);
+  const auto res_ff_2 = result.at(PutKinesisStream::Failure).at(1);
+
+  CHECK(controller.plan->getContent(res_ff_1) == "foo");
+  CHECK(controller.plan->getContent(res_ff_2) == "bar");
+
+  CHECK(res_ff_1->getAttribute(PutKinesisStream::AwsKinesisErrorMessage.name) 
== "Record size mismatch");
+  CHECK(res_ff_2->getAttribute(PutKinesisStream::AwsKinesisErrorMessage.name) 
== "Record size mismatch");
+}
+
+TEST_CASE("PutKinesisStream record size failure path") {
+  minifi::test::SingleProcessorTestController 
controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream"));
+  auto put_kinesis_stream = 
dynamic_cast<PutKinesisStreamMocked*>(controller.getProcessor());
+  REQUIRE(put_kinesis_stream);
+  put_kinesis_stream->behaviour_ = 
MockKinesisClient::KinesisBehaviour::Failure;
+
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AccessKey, "access_key");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::SecretKey, "secret_key");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AmazonKinesisStreamName, "stream_name");
+
+
+  const auto result = controller.trigger({{.content = "foo"}, {.content = 
"bar"}});
+  CHECK(result.at(PutKinesisStream::Success).empty());
+  REQUIRE(result.at(PutKinesisStream::Failure).size() == 2);
+  const auto res_ff_1 = result.at(PutKinesisStream::Failure).at(0);
+  const auto res_ff_2 = result.at(PutKinesisStream::Failure).at(1);
+
+  CHECK(controller.plan->getContent(res_ff_1) == "foo");
+  CHECK(controller.plan->getContent(res_ff_2) == "bar");
+
+  CHECK(res_ff_1->getAttribute(PutKinesisStream::AwsKinesisErrorMessage.name) 
== "Unauthorized");
+  CHECK(res_ff_1->getAttribute(PutKinesisStream::AwsKinesisErrorCode.name) == 
"401");
+  CHECK(res_ff_2->getAttribute(PutKinesisStream::AwsKinesisErrorCode.name) == 
"401");
+  CHECK(res_ff_2->getAttribute(PutKinesisStream::AwsKinesisErrorMessage.name) 
== "Unauthorized");
+}
+
+TEST_CASE("PutKinesisStream partial failure path") {
+  minifi::test::SingleProcessorTestController 
controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream"));
+  auto put_kinesis_stream = 
dynamic_cast<PutKinesisStreamMocked*>(controller.getProcessor());
+  REQUIRE(put_kinesis_stream);
+  put_kinesis_stream->behaviour_ = 
MockKinesisClient::KinesisBehaviour::OddsFail;
+
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AccessKey, "access_key");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::SecretKey, "secret_key");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AmazonKinesisStreamName, "stream_name");
+
+
+  const auto result = controller.trigger({{.content = "foo"}, {.content = 
"bar"}});
+  REQUIRE(result.at(PutKinesisStream::Success).size() == 1);
+  REQUIRE(result.at(PutKinesisStream::Failure).size() == 1);
+  const auto succ_ff = result.at(PutKinesisStream::Success).at(0);
+  const auto fail_ff = result.at(PutKinesisStream::Failure).at(0);
+
+  CHECK(controller.plan->getContent(succ_ff) == "foo");
+  CHECK(controller.plan->getContent(fail_ff) == "bar");
+
+  CHECK(succ_ff->getAttribute(PutKinesisStream::AwsKinesisSequenceNumber.name) 
== "sequence_number_1");
+  CHECK(succ_ff->getAttribute(PutKinesisStream::AwsKinesisShardId.name) == 
"shard_id");
+  CHECK(fail_ff->getAttribute(PutKinesisStream::AwsKinesisErrorCode.name) == 
"8");
+  CHECK(fail_ff->getAttribute(PutKinesisStream::AwsKinesisErrorMessage.name) 
== "Some error message");
+}
+
+
+TEST_CASE("PutKinesisStream simple happy path") {
+  minifi::test::SingleProcessorTestController 
controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream"));
+  auto put_kinesis_stream = controller.getProcessor();
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AccessKey, "access_key");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::SecretKey, "secret_key");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AmazonKinesisStreamName, "stream_name");
+
+
+  const auto result = controller.trigger({{.content = "foo"}, {.content = 
"bar"}});
+  CHECK(result.at(PutKinesisStream::Failure).empty());
+  REQUIRE(result.at(PutKinesisStream::Success).size() == 2);
+  const auto res_ff_1 = result.at(PutKinesisStream::Success).at(0);
+  const auto res_ff_2 = result.at(PutKinesisStream::Success).at(1);
+
+  CHECK(controller.plan->getContent(res_ff_1) == "foo");
+  CHECK(controller.plan->getContent(res_ff_2) == "bar");
+
+  
CHECK(res_ff_1->getAttribute(PutKinesisStream::AwsKinesisSequenceNumber.name) 
== "sequence_number_1");
+  CHECK(res_ff_1->getAttribute(PutKinesisStream::AwsKinesisShardId.name) == 
"shard_id");
+  
CHECK(res_ff_2->getAttribute(PutKinesisStream::AwsKinesisSequenceNumber.name) 
== "sequence_number_2");
+  CHECK(res_ff_2->getAttribute(PutKinesisStream::AwsKinesisShardId.name) == 
"shard_id");
+}
+
+TEST_CASE("PutKinesisStream smaller batch size than available ffs") {
+  minifi::test::SingleProcessorTestController 
controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream"));
+  auto put_kinesis_stream = controller.getProcessor();
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AccessKey, "access_key");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::SecretKey, "secret_key");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AmazonKinesisStreamName, "stream_name");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::MessageBatchSize, "10");
+
+  const auto result = controller.trigger({
+    {.content = "Lorem"},
+    {.content = "ipsum"},
+    {.content = "dolor"},
+    {.content = "sit"},
+    {.content = "amet"},
+    {.content = "consectetur"},
+    {.content = "adipiscing"},
+    {.content = "elit"},
+    {.content = "Morbi"},
+    {.content = "dapibus"},
+    {.content = "risus"},
+    {.content = "a"},
+    {.content = "bibendum"},
+    {.content = "luctus"}});
+
+  CHECK(result.at(PutKinesisStream::Success).size() == 10);
+}
+
+TEST_CASE("PutKinesisStream max batch data size fills up") {
+  minifi::test::SingleProcessorTestController 
controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream"));
+  auto put_kinesis_stream = controller.getProcessor();
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AccessKey, "access_key");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::SecretKey, "secret_key");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AmazonKinesisStreamName, "stream_name");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::MessageBatchSize, "10");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::MaxBatchDataSize, "12 B");
+
+  const auto result = controller.trigger({
+    {.content = "Lorem"},
+    {.content = "ipsum"},
+    {.content = "dolor"},
+    {.content = "sit"},
+    {.content = "amet"},
+    {.content = "consectetur"},
+    {.content = "adipiscing"},
+    {.content = "elit"},
+    {.content = "Morbi"},
+    {.content = "dapibus"},
+    {.content = "risus"},
+    {.content = "a"},
+    {.content = "bibendum"},
+    {.content = "luctus"}});
+
+  REQUIRE(result.at(PutKinesisStream::Success).size() == 3);
+  
CHECK(controller.plan->getContent(result.at(PutKinesisStream::Success).at(0)) 
== "Lorem");
+  
CHECK(controller.plan->getContent(result.at(PutKinesisStream::Success).at(1)) 
== "ipsum");
+  
CHECK(controller.plan->getContent(result.at(PutKinesisStream::Success).at(2)) 
== "dolor");
+}
+
+TEST_CASE("PutKinesisStream max batch data size to different streams") {
+  minifi::test::SingleProcessorTestController 
controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream"));
+  auto put_kinesis_stream = controller.getProcessor();
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AccessKey, "access_key");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::SecretKey, "secret_key");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::MessageBatchSize, "10");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::MaxBatchDataSize, "12 B");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AmazonKinesisStreamName, "${stream_name}");
+
+  const auto result = controller.trigger({
+    {.content = "Lorem", .attributes = {{"stream_name", "stream_one"}}},
+    {.content = "ipsum", .attributes = {{"stream_name", "stream_two"}}},
+    {.content = "dolor", .attributes = {{"stream_name", "stream_three"}}},
+    {.content = "sit", .attributes = {{"stream_name", "stream_four"}}},
+    {.content = "amet", .attributes = {{"stream_name", "stream_five"}}},
+    {.content = "consectetur", .attributes = {{"stream_name", "stream_six"}}},
+    {.content = "adipiscing", .attributes = {{"stream_name", "stream_seven"}}},
+    {.content = "elit", .attributes = {{"stream_name", "stream_eight"}}},
+    {.content = "Morbi", .attributes = {{"stream_name", "stream_nine"}}},
+    {.content = "dapibus", .attributes = {{"stream_name", "stream_ten"}}},
+    {.content = "risus", .attributes = {{"stream_name", "stream_eleven"}}},
+    {.content = "a", .attributes = {{"stream_name", "stream_twelve"}}},
+    {.content = "bibendum", .attributes = {{"stream_name", 
"stream_thirteen"}}},
+    {.content = "luctus", .attributes = {{"stream_name", 
"stream_fourteen"}}}});
+
+  CHECK(result.at(PutKinesisStream::Success).size() == 10);
+}
+
+TEST_CASE("PutKinesisStream with too large message") {
+  minifi::test::SingleProcessorTestController 
controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream"));
+  auto put_kinesis_stream = controller.getProcessor();
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AccessKey, "access_key");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::SecretKey, "secret_key");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AmazonKinesisStreamName, "stream_name");
+
+  std::string too_large_msg((1_MB + 10), 'x');
+  const auto result = controller.trigger(too_large_msg);
+  REQUIRE(result.at(PutKinesisStream::Failure).size() == 1);
+  CHECK(result.at(PutKinesisStream::Success).empty());
+
+  const auto res_ff_1 = result.at(PutKinesisStream::Failure).at(0);
+
+  CHECK(controller.plan->getContent(res_ff_1) == too_large_msg);
+
+  CHECK(res_ff_1->getAttribute(PutKinesisStream::AwsKinesisErrorMessage.name) 
== "record too big 1000010, max allowed 1000000");
+  CHECK(res_ff_1->getAttribute(PutKinesisStream::AwsKinesisErrorCode.name) == 
std::nullopt);
+}
+
+}  // namespace org::apache::nifi::minifi::aws::processors::test
diff --git a/extensions/aws/utils/AWSInitializer.cpp 
b/extensions/aws/utils/AWSInitializer.cpp
index b870e2d3a..08dcb161a 100755
--- a/extensions/aws/utils/AWSInitializer.cpp
+++ b/extensions/aws/utils/AWSInitializer.cpp
@@ -1,6 +1,4 @@
 /**
- * @file AWSInitializer.cpp
- * AWSInitializer class implementation
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -28,12 +26,7 @@
 #include "aws/core/platform/Environment.h"
 #include "AWSSdkLogger.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace aws {
-namespace utils {
+namespace org::apache::nifi::minifi::aws::utils {
 
 AWSInitializer& AWSInitializer::get() {
   static AWSInitializer instance;
@@ -47,13 +40,7 @@ AWSInitializer::~AWSInitializer() {
 
 AWSInitializer::AWSInitializer() {
   Aws::InitAPI(options_);
-  Aws::Utils::Logging::InitializeAWSLogging(
-      std::make_shared<AWSSdkLogger>());
+  Aws::Utils::Logging::InitializeAWSLogging(std::make_shared<AWSSdkLogger>());
 }
 
-}  // namespace utils
-}  // namespace aws
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::aws::utils
diff --git a/extensions/aws/utils/AWSInitializer.h 
b/extensions/aws/utils/AWSInitializer.h
index 0720b7a8e..34b3f2f38 100755
--- a/extensions/aws/utils/AWSInitializer.h
+++ b/extensions/aws/utils/AWSInitializer.h
@@ -1,6 +1,4 @@
 /**
- * @file AWSInitializer.h
- * Initializing AWS SDK
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
diff --git a/extensions/aws/utils/AWSSdkLogger.cpp 
b/extensions/aws/utils/AWSSdkLogger.cpp
index cb67e4551..1514c3f31 100644
--- a/extensions/aws/utils/AWSSdkLogger.cpp
+++ b/extensions/aws/utils/AWSSdkLogger.cpp
@@ -1,6 +1,4 @@
 /**
- * @file AWSSdkLogger.cpp
- * AWSSdkLogger class implementation
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
diff --git a/extensions/aws/utils/AWSSdkLogger.h 
b/extensions/aws/utils/AWSSdkLogger.h
index 536336eb0..af8b2529d 100644
--- a/extensions/aws/utils/AWSSdkLogger.h
+++ b/extensions/aws/utils/AWSSdkLogger.h
@@ -1,6 +1,4 @@
 /**
- * @file AWSSdkLogger.h
- * AWS SDK Logger class
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
diff --git a/extensions/aws/utils/ProxyOptions.h 
b/extensions/aws/utils/ProxyOptions.h
new file mode 100644
index 000000000..8227c93b0
--- /dev/null
+++ b/extensions/aws/utils/ProxyOptions.h
@@ -0,0 +1,33 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include <cstdint>
+
+
+namespace org::apache::nifi::minifi::aws {
+
+struct ProxyOptions {
+  std::string host;
+  uint32_t port = 0;
+  std::string username;
+  std::string password;
+};
+
+}  // namespace org::apache::nifi::minifi::aws
diff --git a/thirdparty/aws-sdk-cpp/dll-export-injection.patch 
b/thirdparty/aws-sdk-cpp/dll-export-injection.patch
index 1343f00c7..b73a91788 100644
--- a/thirdparty/aws-sdk-cpp/dll-export-injection.patch
+++ b/thirdparty/aws-sdk-cpp/dll-export-injection.patch
@@ -1,45 +1,17 @@
-diff -rupN a/generated/src/aws-cpp-sdk-s3/CMakeLists.txt 
b/generated/src/aws-cpp-sdk-s3/CMakeLists.txt
---- a/generated/src/aws-cpp-sdk-s3/CMakeLists.txt      2023-12-11 
13:15:50.741732410 +0100
-+++ b/generated/src/aws-cpp-sdk-s3/CMakeLists.txt      2023-12-14 
14:02:41.247487265 +0100
-@@ -59,6 +59,11 @@ if(USE_WINDOWS_DLL_SEMANTICS AND BUILD_S
-     target_compile_definitions(${PROJECT_NAME} PRIVATE "AWS_S3_EXPORTS")
- endif()
- 
-+if(FORCE_EXPORT_S3_API)
-+    target_compile_definitions(${PROJECT_NAME} PUBLIC 
"AWS_S3_API=__declspec(dllexport)")
-+    target_compile_definitions(${PROJECT_NAME} PUBLIC "AWS_S3_EXTERN=")
-+endif()
-+
- target_include_directories(${PROJECT_NAME} PUBLIC
-     $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
-     $<INSTALL_INTERFACE:include>)
-diff -rupN a/generated/src/aws-cpp-sdk-s3/include/aws/s3/S3_EXPORTS.h 
b/generated/src/aws-cpp-sdk-s3/include/aws/s3/S3_EXPORTS.h
---- a/generated/src/aws-cpp-sdk-s3/include/aws/s3/S3_EXPORTS.h 2023-12-11 
13:15:50.741732410 +0100
-+++ b/generated/src/aws-cpp-sdk-s3/include/aws/s3/S3_EXPORTS.h 2023-12-14 
13:30:54.022259949 +0100
-@@ -22,10 +22,13 @@
-             #define AWS_S3_API __declspec(dllimport)
-         #endif /* AWS_S3_EXPORTS */
-         #define AWS_S3_EXTERN
--    #else
--        #define AWS_S3_API
--        #define AWS_S3_EXTERN extern
-     #endif // USE_IMPORT_EXPORT
-+    #ifndef AWS_S3_API
-+        #define AWS_S3_API
-+    #endif
-+    #ifndef AWS_S3_EXTERN
-+        #define AWS_S3_EXTERN
-+    #endif
- #else // defined (USE_WINDOWS_DLL_SEMANTICS) || defined (WIN32)
-     #define AWS_S3_API
-     #define AWS_S3_EXTERN extern
-diff -rupN a/src/aws-cpp-sdk-core/CMakeLists.txt 
b/src/aws-cpp-sdk-core/CMakeLists.txt
---- a/src/aws-cpp-sdk-core/CMakeLists.txt      2023-12-11 13:15:52.061754319 
+0100
-+++ b/src/aws-cpp-sdk-core/CMakeLists.txt      2023-12-14 14:01:42.666518935 
+0100
-@@ -682,6 +682,11 @@ elseif (BUILD_SHARED_LIBS)
+Subject: [PATCH] dll-export-injection
+---
+Index: src/aws-cpp-sdk-core/CMakeLists.txt
+IDEA additional info:
+Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
+<+>UTF-8
+===================================================================
+diff --git a/src/aws-cpp-sdk-core/CMakeLists.txt 
b/src/aws-cpp-sdk-core/CMakeLists.txt
+--- a/src/aws-cpp-sdk-core/CMakeLists.txt      (revision 
101ff00c3e4f5248673e853617d1731fbf844112)
++++ b/src/aws-cpp-sdk-core/CMakeLists.txt      (date 1743761315080)
+@@ -628,6 +628,11 @@
      target_compile_definitions(${PROJECT_NAME} PRIVATE "SMITHY_EXPORTS=1")
  endif()
- 
+
 +if(FORCE_EXPORT_CORE_API)
 +    target_compile_definitions(${PROJECT_NAME} PUBLIC 
"AWS_CORE_API=__declspec(dllexport)")
 +    target_compile_definitions(${PROJECT_NAME} PUBLIC "AWS_CORE_EXTERN=")
@@ -47,10 +19,15 @@ diff -rupN a/src/aws-cpp-sdk-core/CMakeLists.txt 
b/src/aws-cpp-sdk-core/CMakeLis
 +
  set_compiler_flags(${PROJECT_NAME})
  set_compiler_warnings(${PROJECT_NAME})
- 
-diff -rupN a/src/aws-cpp-sdk-core/include/aws/core/Core_EXPORTS.h 
b/src/aws-cpp-sdk-core/include/aws/core/Core_EXPORTS.h
---- a/src/aws-cpp-sdk-core/include/aws/core/Core_EXPORTS.h     2023-12-11 
13:15:52.061754319 +0100
-+++ b/src/aws-cpp-sdk-core/include/aws/core/Core_EXPORTS.h     2023-12-14 
13:31:41.699706791 +0100
+
+Index: src/aws-cpp-sdk-core/include/aws/core/Core_EXPORTS.h
+IDEA additional info:
+Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
+<+>UTF-8
+===================================================================
+diff --git a/src/aws-cpp-sdk-core/include/aws/core/Core_EXPORTS.h 
b/src/aws-cpp-sdk-core/include/aws/core/Core_EXPORTS.h
+--- a/src/aws-cpp-sdk-core/include/aws/core/Core_EXPORTS.h     (revision 
101ff00c3e4f5248673e853617d1731fbf844112)
++++ b/src/aws-cpp-sdk-core/include/aws/core/Core_EXPORTS.h     (date 
1743761315080)
 @@ -19,10 +19,13 @@
              #define  AWS_CORE_API __declspec(dllimport)
          #endif // AWS_CORE_EXPORTS
@@ -68,25 +45,125 @@ diff -rupN 
a/src/aws-cpp-sdk-core/include/aws/core/Core_EXPORTS.h b/src/aws-cpp-
      #define AWS_CORE_LOCAL
  #else // defined (USE_WINDOWS_DLL_SEMANTICS) || defined (_WIN32)
      #define AWS_CORE_API
-diff -rupN 
a/src/aws-cpp-sdk-core/include/aws/core/endpoint/DefaultEndpointProvider.h 
b/src/aws-cpp-sdk-core/include/aws/core/endpoint/DefaultEndpointProvider.h
---- a/src/aws-cpp-sdk-core/include/aws/core/endpoint/DefaultEndpointProvider.h 
2023-12-11 13:15:52.065087708 +0100
-+++ b/src/aws-cpp-sdk-core/include/aws/core/endpoint/DefaultEndpointProvider.h 
2023-12-14 10:53:36.150592052 +0100
+Index: generated/src/aws-cpp-sdk-kinesis/include/aws/kinesis/Kinesis_EXPORTS.h
+IDEA additional info:
+Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
+<+>UTF-8
+===================================================================
+diff --git 
a/generated/src/aws-cpp-sdk-kinesis/include/aws/kinesis/Kinesis_EXPORTS.h 
b/generated/src/aws-cpp-sdk-kinesis/include/aws/kinesis/Kinesis_EXPORTS.h
+--- a/generated/src/aws-cpp-sdk-kinesis/include/aws/kinesis/Kinesis_EXPORTS.h  
(revision 101ff00c3e4f5248673e853617d1731fbf844112)
++++ b/generated/src/aws-cpp-sdk-kinesis/include/aws/kinesis/Kinesis_EXPORTS.h  
(date 1743763015621)
+@@ -22,10 +22,13 @@
+             #define AWS_KINESIS_API __declspec(dllimport)
+         #endif /* AWS_KINESIS_EXPORTS */
+         #define AWS_KINESIS_EXTERN
+-    #else
+-        #define AWS_KINESIS_API
+-        #define AWS_KINESIS_EXTERN extern
+     #endif // USE_IMPORT_EXPORT
++    #ifndef AWS_KINESIS_API
++        #define AWS_KINESIS_API
++    #endif
++    #ifndef AWS_KINESIS_EXTERN
++        #define AWS_KINESIS_EXTERN
++    #endif
+ #else // defined (USE_WINDOWS_DLL_SEMANTICS) || defined (WIN32)
+     #define AWS_KINESIS_API
+     #define AWS_KINESIS_EXTERN extern
+Index: generated/src/aws-cpp-sdk-s3/include/aws/s3/S3_EXPORTS.h
+IDEA additional info:
+Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
+<+>UTF-8
+===================================================================
+diff --git a/generated/src/aws-cpp-sdk-s3/include/aws/s3/S3_EXPORTS.h 
b/generated/src/aws-cpp-sdk-s3/include/aws/s3/S3_EXPORTS.h
+--- a/generated/src/aws-cpp-sdk-s3/include/aws/s3/S3_EXPORTS.h (revision 
101ff00c3e4f5248673e853617d1731fbf844112)
++++ b/generated/src/aws-cpp-sdk-s3/include/aws/s3/S3_EXPORTS.h (date 
1743761315080)
+@@ -22,10 +22,13 @@
+             #define AWS_S3_API __declspec(dllimport)
+         #endif /* AWS_S3_EXPORTS */
+         #define AWS_S3_EXTERN
+-    #else
+-        #define AWS_S3_API
+-        #define AWS_S3_EXTERN extern
+     #endif // USE_IMPORT_EXPORT
++    #ifndef AWS_S3_API
++        #define AWS_S3_API
++    #endif
++    #ifndef AWS_S3_EXTERN
++        #define AWS_S3_EXTERN
++    #endif
+ #else // defined (USE_WINDOWS_DLL_SEMANTICS) || defined (WIN32)
+     #define AWS_S3_API
+     #define AWS_S3_EXTERN extern
+Index: generated/src/aws-cpp-sdk-kinesis/CMakeLists.txt
+IDEA additional info:
+Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
+<+>UTF-8
+===================================================================
+diff --git a/generated/src/aws-cpp-sdk-kinesis/CMakeLists.txt 
b/generated/src/aws-cpp-sdk-kinesis/CMakeLists.txt
+--- a/generated/src/aws-cpp-sdk-kinesis/CMakeLists.txt (revision 
101ff00c3e4f5248673e853617d1731fbf844112)
++++ b/generated/src/aws-cpp-sdk-kinesis/CMakeLists.txt (date 1743761496232)
+@@ -59,6 +59,11 @@
+     target_compile_definitions(${PROJECT_NAME} PRIVATE "AWS_KINESIS_EXPORTS")
+ endif()
+
++if(FORCE_EXPORT_KINESIS_API)
++    target_compile_definitions(${PROJECT_NAME} PUBLIC 
"AWS_KINESIS_API=__declspec(dllexport)")
++    target_compile_definitions(${PROJECT_NAME} PUBLIC "AWS_KINESIS_EXTERN=")
++endif()
++
+ target_include_directories(${PROJECT_NAME} PUBLIC
+     $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
+     $<INSTALL_INTERFACE:include>)
+Index: src/aws-cpp-sdk-core/include/aws/core/endpoint/DefaultEndpointProvider.h
+IDEA additional info:
+Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
+<+>UTF-8
+===================================================================
+diff --git 
a/src/aws-cpp-sdk-core/include/aws/core/endpoint/DefaultEndpointProvider.h 
b/src/aws-cpp-sdk-core/include/aws/core/endpoint/DefaultEndpointProvider.h
+--- a/src/aws-cpp-sdk-core/include/aws/core/endpoint/DefaultEndpointProvider.h 
(revision 101ff00c3e4f5248673e853617d1731fbf844112)
++++ b/src/aws-cpp-sdk-core/include/aws/core/endpoint/DefaultEndpointProvider.h 
(date 1743761315080)
 @@ -6,6 +6,7 @@
- 
+
  #pragma once
- 
+
 +#include <aws/core/Core_EXPORTS.h>
  #include <aws/core/endpoint/AWSPartitions.h>
  #include <aws/core/endpoint/EndpointProviderBase.h>
  #include <aws/core/endpoint/EndpointParameter.h>
-diff -rupN 
a/src/aws-cpp-sdk-core/include/aws/core/endpoint/EndpointProviderBase.h 
b/src/aws-cpp-sdk-core/include/aws/core/endpoint/EndpointProviderBase.h
---- a/src/aws-cpp-sdk-core/include/aws/core/endpoint/EndpointProviderBase.h    
2023-12-11 13:15:52.065087708 +0100
-+++ b/src/aws-cpp-sdk-core/include/aws/core/endpoint/EndpointProviderBase.h    
2023-12-14 10:53:36.150592052 +0100
+Index: src/aws-cpp-sdk-core/include/aws/core/endpoint/EndpointProviderBase.h
+IDEA additional info:
+Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
+<+>UTF-8
+===================================================================
+diff --git 
a/src/aws-cpp-sdk-core/include/aws/core/endpoint/EndpointProviderBase.h 
b/src/aws-cpp-sdk-core/include/aws/core/endpoint/EndpointProviderBase.h
+--- a/src/aws-cpp-sdk-core/include/aws/core/endpoint/EndpointProviderBase.h    
(revision 101ff00c3e4f5248673e853617d1731fbf844112)
++++ b/src/aws-cpp-sdk-core/include/aws/core/endpoint/EndpointProviderBase.h    
(date 1743761315080)
 @@ -6,6 +6,7 @@
- 
+
  #pragma once
- 
+
 +#include <aws/core/Core_EXPORTS.h>
  #include <aws/core/endpoint/AWSEndpoint.h>
  #include <aws/core/client/AWSError.h>
  #include <aws/core/endpoint/EndpointParameter.h>
+Index: generated/src/aws-cpp-sdk-s3/CMakeLists.txt
+IDEA additional info:
+Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
+<+>UTF-8
+===================================================================
+diff --git a/generated/src/aws-cpp-sdk-s3/CMakeLists.txt 
b/generated/src/aws-cpp-sdk-s3/CMakeLists.txt
+--- a/generated/src/aws-cpp-sdk-s3/CMakeLists.txt      (revision 
101ff00c3e4f5248673e853617d1731fbf844112)
++++ b/generated/src/aws-cpp-sdk-s3/CMakeLists.txt      (date 1743761315080)
+@@ -59,6 +59,11 @@
+     target_compile_definitions(${PROJECT_NAME} PRIVATE "AWS_S3_EXPORTS")
+ endif()
+
++if(FORCE_EXPORT_S3_API)
++    target_compile_definitions(${PROJECT_NAME} PUBLIC 
"AWS_S3_API=__declspec(dllexport)")
++    target_compile_definitions(${PROJECT_NAME} PUBLIC "AWS_S3_EXTERN=")
++endif()
++
+ target_include_directories(${PROJECT_NAME} PUBLIC
+     $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
+     $<INSTALL_INTERFACE:include>)

Reply via email to