fgerlits commented on code in PR #1927:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1927#discussion_r2039844296


##########
extensions/aws/CMakeLists.txt:
##########
@@ -33,14 +33,20 @@ add_minifi_library(minifi-aws SHARED ${SOURCES})
 target_link_libraries(minifi-aws PUBLIC ${LIBMINIFI} Threads::Threads)
 
 target_wholearchive_library_private(minifi-aws AWS::aws-cpp-sdk-s3)
+target_wholearchive_library_private(minifi-aws AWS::aws-cpp-sdk-kinesis)
 if(CMAKE_SYSTEM_PROCESSOR MATCHES "(arm64)|(ARM64)|(aarch64)|(armv8)")
     target_wholearchive_library_private(minifi-aws AWS::aws-checksums)
 endif()
-get_target_property(AWS_SDK_INCLUDE_DIRS AWS::aws-cpp-sdk-s3 
INTERFACE_INCLUDE_DIRECTORIES)
+get_target_property(AWS_SDK_S3_INCLUDE_DIRS AWS::aws-cpp-sdk-s3 
INTERFACE_INCLUDE_DIRECTORIES)
+get_target_property(AWS_SDK_KINESIS_INCLUDE_DIRS AWS::aws-cpp-sdk-kinesis 
INTERFACE_INCLUDE_DIRECTORIES)
+
 target_include_directories(minifi-aws INTERFACE ${AWS_SDK_INCLUDE_DIRS})

Review Comment:
   Should this be
   ```suggestion
   target_include_directories(minifi-aws INTERFACE ${AWS_SDK_S3_INCLUDE_DIRS})
   ```
   ?
   
   Maybe not, since it seems to work as it is, but it's not clear to me why.



##########
extensions/aws/utils/ProxyOptions.h:
##########
@@ -0,0 +1,35 @@
+/**
+ * @file S3RequestSender.h
+ * S3RequestSender class declaration
+ *

Review Comment:
   here too, please update or remove



##########
extensions/aws/tests/PutKinesisStreamTests.cpp:
##########
@@ -0,0 +1,195 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <minifi-cpp/core/FlowFile.h>
+
+#include "aws/kinesis/model/PutRecordsRequest.h"
+#include "core/Resource.h"
+#include "processors/PutKinesisStream.h"
+#include "unit/Catch.h"
+#include "unit/SingleProcessorTestController.h"
+#include "unit/TestBase.h"
+
+namespace org::apache::nifi::minifi::aws::processors::test {
+
+class MockKinesisClient final : public Aws::Kinesis::KinesisClient {
+  Aws::Kinesis::Model::PutRecordsOutcome PutRecords(const 
Aws::Kinesis::Model::PutRecordsRequest& request) const override {
+    Aws::Kinesis::Model::PutRecordsResult result;
+    for ([[maybe_unused]] const auto& request_entry : request.GetRecords()) {
+      Aws::Kinesis::Model::PutRecordsResultEntry result_entry;
+      result_entry.SetSequenceNumber(fmt::format("sequence_number_{}", 
++sequence_number_));
+      result_entry.SetShardId("shard_id");
+      result.AddRecords(result_entry);
+    }
+    return result;
+  }
+
+  mutable uint32_t sequence_number_ = 0;
+};
+
+class PutKinesisStreamMocked final : public aws::processors::PutKinesisStream {
+ public:
+  static constexpr const char* Description = "PutKinesisStreamMocked";
+
+  explicit PutKinesisStreamMocked(const std::string& name, const 
minifi::utils::Identifier& uuid = minifi::utils::Identifier())
+  : PutKinesisStream(name, uuid) {
+  }
+
+  PutKinesisStreamMocked(const PutKinesisStreamMocked&) = delete;
+  PutKinesisStreamMocked(PutKinesisStreamMocked&&) = delete;
+  PutKinesisStreamMocked& operator=(const PutKinesisStreamMocked&) = delete;
+  PutKinesisStreamMocked& operator=(PutKinesisStreamMocked&&) = delete;
+
+  ~PutKinesisStreamMocked() override = default;
+
+  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
+
+  std::unique_ptr<Aws::Kinesis::KinesisClient> getClient(const 
Aws::Auth::AWSCredentials&) override {
+    return std::make_unique<MockKinesisClient>();
+  }
+};
+REGISTER_RESOURCE(PutKinesisStreamMocked, Processor);
+
+TEST_CASE("PutKinesisStream simple happy path") {
+  minifi::test::SingleProcessorTestController 
controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream"));
+  auto put_kinesis_stream = controller.getProcessor();
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AccessKey, "access_key");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::SecretKey, "secret_key");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AmazonKinesisStreamName, "stream_name");
+
+
+  const auto result = controller.trigger({{.content = "foo"}, {.content = 
"bar"}});
+  CHECK(result.at(PutKinesisStream::Failure).empty());
+  CHECK(result.at(PutKinesisStream::Success).size() == 2);
+  const auto res_ff_1 = result.at(PutKinesisStream::Success).at(0);
+  const auto res_ff_2 = result.at(PutKinesisStream::Success).at(1);
+
+  CHECK(controller.plan->getContent(res_ff_1) == "foo");
+  CHECK(controller.plan->getContent(res_ff_2) == "bar");
+
+  
CHECK(res_ff_1->getAttribute(PutKinesisStream::AwsKinesisSequenceNumber.name) 
== "sequence_number_1");
+  CHECK(res_ff_1->getAttribute(PutKinesisStream::AwsKinesisShardId.name) == 
"shard_id");
+  
CHECK(res_ff_2->getAttribute(PutKinesisStream::AwsKinesisSequenceNumber.name) 
== "sequence_number_2");
+  CHECK(res_ff_2->getAttribute(PutKinesisStream::AwsKinesisShardId.name) == 
"shard_id");
+}
+
+TEST_CASE("PutKinesisStream smaller batch size than available ffs") {
+  minifi::test::SingleProcessorTestController 
controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream"));
+  auto put_kinesis_stream = controller.getProcessor();
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AccessKey, "access_key");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::SecretKey, "secret_key");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AmazonKinesisStreamName, "stream_name");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::MessageBatchSize, "10");
+
+  const auto result = controller.trigger({
+    {.content = "Lorem"},
+    {.content = "ipsum"},
+    {.content = "dolor"},
+    {.content = "sit"},
+    {.content = "amet"},
+    {.content = "consectetur"},
+    {.content = "adipiscing"},
+    {.content = "elit"},
+    {.content = "Morbi"},
+    {.content = "dapibus"},
+    {.content = "risus"},
+    {.content = "a"},
+    {.content = "bibendum"},
+    {.content = "luctus"}});
+
+  CHECK(result.at(PutKinesisStream::Success).size() == 10);
+}
+
+TEST_CASE("PutKinesisStream max batch data size fills up") {
+  minifi::test::SingleProcessorTestController 
controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream"));
+  auto put_kinesis_stream = controller.getProcessor();
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AccessKey, "access_key");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::SecretKey, "secret_key");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AmazonKinesisStreamName, "stream_name");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::MessageBatchSize, "10");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::MaxBatchDataSize, "12 B");
+
+  const auto result = controller.trigger({
+    {.content = "Lorem"},
+    {.content = "ipsum"},
+    {.content = "dolor"},
+    {.content = "sit"},
+    {.content = "amet"},
+    {.content = "consectetur"},
+    {.content = "adipiscing"},
+    {.content = "elit"},
+    {.content = "Morbi"},
+    {.content = "dapibus"},
+    {.content = "risus"},
+    {.content = "a"},
+    {.content = "bibendum"},
+    {.content = "luctus"}});
+
+  CHECK(result.at(PutKinesisStream::Success).size() == 3);
+}
+
+TEST_CASE("PutKinesisStream max batch data size to different streams") {
+  minifi::test::SingleProcessorTestController 
controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream"));
+  auto put_kinesis_stream = controller.getProcessor();
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AccessKey, "access_key");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::SecretKey, "secret_key");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AmazonKinesisStreamName, "stream_name");

Review Comment:
   line 151 could be removed, as it is overwritten by line 154



##########
extensions/aws/processors/PutKinesisStream.cpp:
##########
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PutKinesisStream.h"
+
+#include <memory>
+#include <random>
+#include <unordered_map>
+
+#include "aws/kinesis/KinesisClient.h"
+#include "aws/kinesis/model/PutRecordsRequest.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/Resource.h"
+#include "utils/ProcessorConfigUtils.h"
+
+namespace org::apache::nifi::minifi::aws::processors {
+
+void PutKinesisStream::initialize() {
+  setSupportedProperties(Properties);
+  setSupportedRelationships(Relationships);
+}
+
+void PutKinesisStream::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory& session_factory) {
+  AwsProcessor::onSchedule(context, session_factory);
+
+  batch_size_ = parseU64Property(context, MessageBatchSize);
+  if (batch_size_ == 0 || batch_size_ > 500) {
+    logger_->log_warn("PutKinesisStream::MessageBatchSize is invalid. Setting 
it to the maximum 500 value.");
+    batch_size_ = 500;
+  }
+  batch_data_size_soft_cap_ = parseDataSizeProperty(context, MaxBatchDataSize);
+  if (batch_data_size_soft_cap_ > 4_MB) {
+    logger_->log_warn("PutKinesisStream::MaxMessageBufferSize is invalid. 
Setting it to the maximum 4 MB value.");
+    batch_data_size_soft_cap_ = 4_MB;
+  }
+
+  endpoint_override_url_ = context.getProperty(EndpointOverrideURL.name) | 
minifi::utils::toOptional();
+}
+
+struct StreamBatch {
+  uint64_t batch_size = 0;
+  std::vector<std::shared_ptr<core::FlowFile>> flow_files;
+};
+
+void PutKinesisStream::onTrigger(core::ProcessContext& context, 
core::ProcessSession& session) {
+  logger_->log_trace("PutKinesisStream onTrigger");
+
+  constexpr uint64_t SINGLE_RECORD_MAX_SIZE = 1_MB;
+  std::unordered_map<std::string, StreamBatch> stream_batches;
+  auto credentials = getAWSCredentials(context, nullptr);
+
+  if (!credentials) {
+    logger_->log_error("Failed to get credentials for PutKinesisStream");
+    context.yield();
+    return;
+  }
+
+  for (uint64_t i = 0; i < batch_size_; i++) {
+    std::shared_ptr<core::FlowFile> flow_file = session.get();
+    if (!flow_file) { break; }
+    const auto flow_file_size = flow_file->getSize();
+    if (flow_file_size > SINGLE_RECORD_MAX_SIZE) {
+      flow_file->setAttribute(AwsKinesisErrorMessage.name, fmt::format("record 
too big {}, max allowed {}", flow_file_size, SINGLE_RECORD_MAX_SIZE));
+      session.transfer(flow_file, Failure);
+      logger_->log_error("Failed to publish to kinesis record {} because the 
size was greater than {} bytes", flow_file->getUUID().to_string(), 
SINGLE_RECORD_MAX_SIZE);
+      continue;
+    }
+
+    auto stream_name = context.getProperty(AmazonKinesisStreamName.name, 
flow_file.get());
+    if (!stream_name) {
+      logger_->log_error("Stream name is invalid due to {}", 
stream_name.error().message());
+      session.transfer(flow_file, Failure);
+      continue;
+    }
+    auto partition_key = 
context.getProperty(AmazonKinesisStreamPartitionKey.name, flow_file.get())
+        | minifi::utils::valueOrElse([&flow_file]() -> std::string { return 
flow_file->getUUID().to_string(); });
+
+    stream_batches[*stream_name].flow_files.push_back(std::move(flow_file));
+    stream_batches[*stream_name].batch_size += flow_file_size;
+
+    if (stream_batches[*stream_name].batch_size > batch_data_size_soft_cap_) {
+      break;
+    }
+  }
+

Review Comment:
   if `stream_batches` is empty, we could return here, before `getClient()`
   
   we could yield, as well, although I'm not sure if we want to do that if 
there were incoming flow files but they were all routed to Failure



##########
extensions/aws/processors/AwsProcessor.h:
##########
@@ -0,0 +1,188 @@
+/**
+ * @file S3Processor.h
+ * Base S3 processor class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <array>
+#include <memory>
+#include <optional>
+#include <set>
+#include <string>
+#include <string_view>
+#include <utility>
+
+#include "aws/core/auth/AWSCredentialsProvider.h"
+#include "AWSCredentialsProvider.h"
+#include "utils/ProxyOptions.h"
+#include "core/PropertyDefinition.h"
+#include "core/PropertyDefinitionBuilder.h"
+#include "minifi-cpp/core/PropertyValidator.h"
+#include "core/Processor.h"
+
+
+namespace org::apache::nifi::minifi::aws::processors {
+
+namespace region {
+inline constexpr std::string_view AF_SOUTH_1 = "af-south-1";
+inline constexpr std::string_view AP_EAST_1 = "ap-east-1";
+inline constexpr std::string_view AP_NORTHEAST_1 = "ap-northeast-1";
+inline constexpr std::string_view AP_NORTHEAST_2 = "ap-northeast-2";
+inline constexpr std::string_view AP_NORTHEAST_3 = "ap-northeast-3";
+inline constexpr std::string_view AP_SOUTH_1 = "ap-south-1";
+inline constexpr std::string_view AP_SOUTH_2 = "ap-south-2";
+inline constexpr std::string_view AP_SOUTHEAST_1 = "ap-southeast-1";
+inline constexpr std::string_view AP_SOUTHEAST_2 = "ap-southeast-2";
+inline constexpr std::string_view AP_SOUTHEAST_3 = "ap-southeast-3";
+inline constexpr std::string_view AP_SOUTHEAST_4 = "ap-southeast-4";
+inline constexpr std::string_view AP_SOUTHEAST_5 = "ap-southeast-5";
+inline constexpr std::string_view AP_SOUTHEAST_7 = "ap-southeast-7";
+inline constexpr std::string_view CA_CENTRAL_1 = "ca-central-1";
+inline constexpr std::string_view CA_WEST_1 = "ca-west-1";
+inline constexpr std::string_view CN_NORTH_1 = "cn-north-1";
+inline constexpr std::string_view CN_NORTHWEST_1 = "cn-northwest-1";
+inline constexpr std::string_view EU_CENTRAL_1 = "eu-central-1";
+inline constexpr std::string_view EU_CENTRAL_2 = "eu-central-2";
+inline constexpr std::string_view EU_ISOE_WEST_1 = "eu-isoe-west-1";
+inline constexpr std::string_view EU_NORTH_1 = "eu-north-1";
+inline constexpr std::string_view EU_SOUTH_1 = "eu-south-1";
+inline constexpr std::string_view EU_SOUTH_2 = "eu-south-2";
+inline constexpr std::string_view EU_WEST_1 = "eu-west-1";
+inline constexpr std::string_view EU_WEST_2 = "eu-west-2";
+inline constexpr std::string_view EU_WEST_3 = "eu-west-3";
+inline constexpr std::string_view IL_CENTRAL_1 = "il-central-1";
+inline constexpr std::string_view ME_CENTRAL_1 = "me-central-1";
+inline constexpr std::string_view ME_SOUTH_1 = "me-south-1";
+inline constexpr std::string_view MX_CENTRAL_1 = "mx-central-1";
+inline constexpr std::string_view SA_EAST_1 = "sa-east-1";
+inline constexpr std::string_view US_EAST_1 = "us-east-1";
+inline constexpr std::string_view US_EAST_2 = "us-east-2";
+inline constexpr std::string_view US_GOV_EAST_1 = "us-gov-east-1";
+inline constexpr std::string_view US_GOV_WEST_1 = "us-gov-west-1";
+inline constexpr std::string_view US_ISO_EAST_1 = "us-iso-east-1";
+inline constexpr std::string_view US_ISO_WEST_1 = "us-iso-west-1";
+inline constexpr std::string_view US_ISOB_EAST_1 = "us-isob-east-1";
+inline constexpr std::string_view US_ISOF_EAST_1 = "us-isof-east-1";
+inline constexpr std::string_view US_ISOF_SOUTH_1 = "us-isof-south-1";
+inline constexpr std::string_view US_WEST_1 = "us-west-1";
+inline constexpr std::string_view US_WEST_2 = "us-west-2";
+
+inline constexpr auto REGIONS = std::array{
+  AF_SOUTH_1, AP_EAST_1, AP_NORTHEAST_1, AP_NORTHEAST_2, AP_NORTHEAST_3, 
AP_SOUTH_1, AP_SOUTH_2, AP_SOUTHEAST_1, AP_SOUTHEAST_2, AP_SOUTHEAST_3, 
AP_SOUTHEAST_4, AP_SOUTHEAST_5, AP_SOUTHEAST_7,
+  CA_CENTRAL_1, CA_WEST_1, CN_NORTH_1, CN_NORTHWEST_1, EU_CENTRAL_1, 
EU_CENTRAL_2, EU_ISOE_WEST_1, EU_NORTH_1, EU_SOUTH_1, EU_SOUTH_2, EU_WEST_1, 
EU_WEST_2, EU_WEST_3, IL_CENTRAL_1, ME_CENTRAL_1,
+  ME_SOUTH_1, MX_CENTRAL_1, SA_EAST_1, US_EAST_1, US_EAST_2, US_GOV_EAST_1, 
US_GOV_WEST_1, US_ISO_EAST_1, US_ISO_WEST_1, US_ISOB_EAST_1, US_ISOF_EAST_1, 
US_ISOF_SOUTH_1, US_WEST_1, US_WEST_2
+};
+}  // namespace region
+
+struct CommonProperties {
+  Aws::Auth::AWSCredentials credentials;
+  aws::ProxyOptions proxy;
+  std::string endpoint_override_url;
+};
+
+class AwsProcessor : public core::ProcessorImpl {
+ public:
+  EXTENSIONAPI static constexpr auto AccessKey = 
core::PropertyDefinitionBuilder<>::createProperty("Access Key")
+      .withDescription("AWS account access key")
+      .supportsExpressionLanguage(true)
+      .build();
+  EXTENSIONAPI static constexpr auto SecretKey = 
core::PropertyDefinitionBuilder<>::createProperty("Secret Key")
+      .withDescription("AWS account secret key")
+      .supportsExpressionLanguage(true)
+      .isSensitive(true)
+      .build();
+  EXTENSIONAPI static constexpr auto CredentialsFile = 
core::PropertyDefinitionBuilder<>::createProperty("Credentials File")
+      .withDescription("Path to a file containing AWS access key and secret 
key in properties file format. Properties used: accessKey and secretKey")
+      .build();
+  EXTENSIONAPI static constexpr auto AWSCredentialsProviderService = 
core::PropertyDefinitionBuilder<>::createProperty("AWS Credentials Provider 
service")
+      .withDescription("The name of the AWS Credentials Provider controller 
service that is used to obtain AWS credentials.")
+      .build();
+  EXTENSIONAPI static constexpr auto Region = 
core::PropertyDefinitionBuilder<region::REGIONS.size()>::createProperty("Region")
+      .isRequired(true)
+      .withDefaultValue(region::US_WEST_2)
+      .withAllowedValues(region::REGIONS)
+      .withDescription("AWS Region")
+      .build();
+  EXTENSIONAPI static constexpr auto CommunicationsTimeout = 
core::PropertyDefinitionBuilder<>::createProperty("Communications Timeout")
+      .isRequired(true)
+      .withValidator(core::StandardPropertyValidators::TIME_PERIOD_VALIDATOR)
+      .withDefaultValue("30 sec")
+      .withDescription("Sets the timeout of the communication between the AWS 
server and the client")
+      .build();
+  EXTENSIONAPI static constexpr auto EndpointOverrideURL = 
core::PropertyDefinitionBuilder<>::createProperty("Endpoint Override URL")
+      .withDescription("Endpoint URL to use instead of the AWS default 
including scheme, host, "
+          "port, and path. The AWS libraries select an endpoint URL based on 
the AWS "
+          "region, but this property overrides the selected endpoint URL, 
allowing use "
+          "with other S3-compatible endpoints.")
+      .supportsExpressionLanguage(true)
+      .build();

Review Comment:
   this used to have a non-blank validator; did it get lost during a rebase?



##########
extensions/aws/processors/AwsProcessor.h:
##########
@@ -0,0 +1,188 @@
+/**
+ * @file S3Processor.h
+ * Base S3 processor class declaration
+ *

Review Comment:
   this should either be updated or deleted



##########
extensions/aws/processors/PutKinesisStream.cpp:
##########
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PutKinesisStream.h"
+
+#include <memory>
+#include <random>
+#include <unordered_map>
+
+#include "aws/kinesis/KinesisClient.h"
+#include "aws/kinesis/model/PutRecordsRequest.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/Resource.h"
+#include "utils/ProcessorConfigUtils.h"
+
+namespace org::apache::nifi::minifi::aws::processors {
+
+void PutKinesisStream::initialize() {
+  setSupportedProperties(Properties);
+  setSupportedRelationships(Relationships);
+}
+
+void PutKinesisStream::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory& session_factory) {
+  AwsProcessor::onSchedule(context, session_factory);
+
+  batch_size_ = parseU64Property(context, MessageBatchSize);
+  if (batch_size_ == 0 || batch_size_ > 500) {
+    logger_->log_warn("PutKinesisStream::MessageBatchSize is invalid. Setting 
it to the maximum 500 value.");
+    batch_size_ = 500;
+  }
+  batch_data_size_soft_cap_ = parseDataSizeProperty(context, MaxBatchDataSize);
+  if (batch_data_size_soft_cap_ > 4_MB) {
+    logger_->log_warn("PutKinesisStream::MaxMessageBufferSize is invalid. 
Setting it to the maximum 4 MB value.");
+    batch_data_size_soft_cap_ = 4_MB;
+  }
+
+  endpoint_override_url_ = context.getProperty(EndpointOverrideURL.name) | 
minifi::utils::toOptional();
+}
+
+struct StreamBatch {
+  uint64_t batch_size = 0;
+  std::vector<std::shared_ptr<core::FlowFile>> flow_files;
+};
+
+void PutKinesisStream::onTrigger(core::ProcessContext& context, 
core::ProcessSession& session) {
+  logger_->log_trace("PutKinesisStream onTrigger");
+
+  constexpr uint64_t SINGLE_RECORD_MAX_SIZE = 1_MB;

Review Comment:
   nitpick: should be `static constexpr`



##########
extensions/aws/tests/PutKinesisStreamTests.cpp:
##########
@@ -0,0 +1,195 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <minifi-cpp/core/FlowFile.h>
+
+#include "aws/kinesis/model/PutRecordsRequest.h"
+#include "core/Resource.h"
+#include "processors/PutKinesisStream.h"
+#include "unit/Catch.h"
+#include "unit/SingleProcessorTestController.h"
+#include "unit/TestBase.h"
+
+namespace org::apache::nifi::minifi::aws::processors::test {
+
+class MockKinesisClient final : public Aws::Kinesis::KinesisClient {
+  Aws::Kinesis::Model::PutRecordsOutcome PutRecords(const 
Aws::Kinesis::Model::PutRecordsRequest& request) const override {
+    Aws::Kinesis::Model::PutRecordsResult result;
+    for ([[maybe_unused]] const auto& request_entry : request.GetRecords()) {
+      Aws::Kinesis::Model::PutRecordsResultEntry result_entry;
+      result_entry.SetSequenceNumber(fmt::format("sequence_number_{}", 
++sequence_number_));
+      result_entry.SetShardId("shard_id");
+      result.AddRecords(result_entry);
+    }
+    return result;
+  }
+
+  mutable uint32_t sequence_number_ = 0;
+};
+
+class PutKinesisStreamMocked final : public aws::processors::PutKinesisStream {
+ public:
+  static constexpr const char* Description = "PutKinesisStreamMocked";
+
+  explicit PutKinesisStreamMocked(const std::string& name, const 
minifi::utils::Identifier& uuid = minifi::utils::Identifier())
+  : PutKinesisStream(name, uuid) {
+  }
+
+  PutKinesisStreamMocked(const PutKinesisStreamMocked&) = delete;
+  PutKinesisStreamMocked(PutKinesisStreamMocked&&) = delete;
+  PutKinesisStreamMocked& operator=(const PutKinesisStreamMocked&) = delete;
+  PutKinesisStreamMocked& operator=(PutKinesisStreamMocked&&) = delete;
+
+  ~PutKinesisStreamMocked() override = default;
+
+  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
+
+  std::unique_ptr<Aws::Kinesis::KinesisClient> getClient(const 
Aws::Auth::AWSCredentials&) override {
+    return std::make_unique<MockKinesisClient>();
+  }
+};
+REGISTER_RESOURCE(PutKinesisStreamMocked, Processor);
+
+TEST_CASE("PutKinesisStream simple happy path") {
+  minifi::test::SingleProcessorTestController 
controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream"));
+  auto put_kinesis_stream = controller.getProcessor();
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AccessKey, "access_key");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::SecretKey, "secret_key");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AmazonKinesisStreamName, "stream_name");
+
+
+  const auto result = controller.trigger({{.content = "foo"}, {.content = 
"bar"}});
+  CHECK(result.at(PutKinesisStream::Failure).empty());
+  CHECK(result.at(PutKinesisStream::Success).size() == 2);
+  const auto res_ff_1 = result.at(PutKinesisStream::Success).at(0);
+  const auto res_ff_2 = result.at(PutKinesisStream::Success).at(1);
+
+  CHECK(controller.plan->getContent(res_ff_1) == "foo");
+  CHECK(controller.plan->getContent(res_ff_2) == "bar");
+
+  
CHECK(res_ff_1->getAttribute(PutKinesisStream::AwsKinesisSequenceNumber.name) 
== "sequence_number_1");
+  CHECK(res_ff_1->getAttribute(PutKinesisStream::AwsKinesisShardId.name) == 
"shard_id");
+  
CHECK(res_ff_2->getAttribute(PutKinesisStream::AwsKinesisSequenceNumber.name) 
== "sequence_number_2");
+  CHECK(res_ff_2->getAttribute(PutKinesisStream::AwsKinesisShardId.name) == 
"shard_id");
+}
+
+TEST_CASE("PutKinesisStream smaller batch size than available ffs") {
+  minifi::test::SingleProcessorTestController 
controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream"));
+  auto put_kinesis_stream = controller.getProcessor();
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AccessKey, "access_key");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::SecretKey, "secret_key");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AmazonKinesisStreamName, "stream_name");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::MessageBatchSize, "10");
+
+  const auto result = controller.trigger({
+    {.content = "Lorem"},
+    {.content = "ipsum"},
+    {.content = "dolor"},
+    {.content = "sit"},
+    {.content = "amet"},
+    {.content = "consectetur"},
+    {.content = "adipiscing"},
+    {.content = "elit"},
+    {.content = "Morbi"},
+    {.content = "dapibus"},
+    {.content = "risus"},
+    {.content = "a"},
+    {.content = "bibendum"},
+    {.content = "luctus"}});
+
+  CHECK(result.at(PutKinesisStream::Success).size() == 10);
+}
+
+TEST_CASE("PutKinesisStream max batch data size fills up") {
+  minifi::test::SingleProcessorTestController 
controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream"));
+  auto put_kinesis_stream = controller.getProcessor();
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AccessKey, "access_key");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::SecretKey, "secret_key");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AmazonKinesisStreamName, "stream_name");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::MessageBatchSize, "10");
+  controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::MaxBatchDataSize, "12 B");
+
+  const auto result = controller.trigger({
+    {.content = "Lorem"},
+    {.content = "ipsum"},
+    {.content = "dolor"},
+    {.content = "sit"},
+    {.content = "amet"},
+    {.content = "consectetur"},
+    {.content = "adipiscing"},
+    {.content = "elit"},
+    {.content = "Morbi"},
+    {.content = "dapibus"},
+    {.content = "risus"},
+    {.content = "a"},
+    {.content = "bibendum"},
+    {.content = "luctus"}});
+
+  CHECK(result.at(PutKinesisStream::Success).size() == 3);

Review Comment:
   we could check the content of the last flow file, to document whether all of 
it got processed, or only up to the max batch data size



##########
extensions/aws/processors/PutKinesisStream.h:
##########
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+
+#include <memory>
+#include <optional>
+#include <sstream>
+#include <string>
+#include <utility>
+
+#include "S3Processor.h"
+#include "core/PropertyDefinitionBuilder.h"
+#include "utils/ArrayUtils.h"
+#include "aws/kinesis/KinesisClient.h"
+
+
+namespace org::apache::nifi::minifi::aws::processors {
+
+class PutKinesisStream : public AwsProcessor {
+ public:
+  EXTENSIONAPI static constexpr const char* Description = "Sends the contents 
to a specified Amazon Kinesis. In order to send data to Kinesis, the stream 
name has to be specified.";
+
+  EXTENSIONAPI static constexpr auto AmazonKinesisStreamName = 
core::PropertyDefinitionBuilder<>::createProperty("Amazon Kinesis Stream Name")
+      .withDescription("The name of Kinesis Stream")
+      .isRequired(true)
+      .withValidator(core::StandardPropertyValidators::NON_BLANK_VALIDATOR)
+      .supportsExpressionLanguage(true)
+      .build();
+  EXTENSIONAPI static constexpr auto AmazonKinesisStreamPartitionKey = 
core::PropertyDefinitionBuilder<>::createProperty("Amazon Kinesis Stream 
Partition Key")
+      .withDescription("The partition key attribute. If it is not set, a 
random value is used")
+      .supportsExpressionLanguage(true)
+      .build();
+  EXTENSIONAPI static constexpr auto MessageBatchSize = 
core::PropertyDefinitionBuilder<>::createProperty("Batch Size")
+      .withDescription("Batch size for messages. [1-500]")
+      
.withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR)
+      .withDefaultValue("250")
+      .build();
+  EXTENSIONAPI static constexpr auto MaxBatchDataSize = 
core::PropertyDefinitionBuilder<>::createProperty("Max Batch Data Size")
+      .withDescription("Soft cap on the data size of the batch to a single 
stream. (max 4MB)")
+      .withValidator(core::StandardPropertyValidators::DATA_SIZE_VALIDATOR)
+      .withDefaultValue("1 MB")
+      .build();
+
+  EXTENSIONAPI static constexpr auto Properties = 
minifi::utils::array_cat(AwsProcessor::Properties, 
std::to_array<core::PropertyReference>({
+    AmazonKinesisStreamName, AmazonKinesisStreamPartitionKey, 
MessageBatchSize, MaxBatchDataSize
+  }));
+
+  EXTENSIONAPI static constexpr auto Success = 
core::RelationshipDefinition{"success", "FlowFiles are routed to success 
relationship"};
+  EXTENSIONAPI static constexpr auto Failure = 
core::RelationshipDefinition{"failure", "FlowFiles are routed to failure 
relationship"};
+  EXTENSIONAPI static constexpr auto Relationships = std::array{Success, 
Failure};
+
+  EXTENSIONAPI static constexpr auto AwsKinesisErrorMessage = 
core::OutputAttributeDefinition<>{"aws.kinesis.error.message", { Failure },
+    "Error message on posting message to AWS Kinesis"};
+  EXTENSIONAPI static constexpr auto AwsKinesisErrorCode = 
core::OutputAttributeDefinition<>{"aws.kinesis.error.code", { Failure },
+    "Error code for the message when posting to AWS Kinesis"};
+  EXTENSIONAPI static constexpr auto AwsKinesisSequenceNumber = 
core::OutputAttributeDefinition<>{"aws.kinesis.sequence.number", { Success },
+    "Sequence number for the message when posting to AWS Kinesis"};
+  EXTENSIONAPI static constexpr auto AwsKinesisShardId = 
core::OutputAttributeDefinition<>{"aws.kinesis.shard.id", { Success },
+    "Shard id of the message posted to AWS Kinesis"};
+  EXTENSIONAPI static constexpr auto OutputAttributes = 
std::array<core::OutputAttributeReference, 4>{AwsKinesisErrorMessage, 
AwsKinesisErrorCode, AwsKinesisSequenceNumber, AwsKinesisShardId};
+
+  EXTENSIONAPI static constexpr bool SupportsDynamicProperties = true;

Review Comment:
   this processor does not seem to support dynamic properties



##########
extensions/aws/processors/PutKinesisStream.h:
##########
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+
+#include <memory>
+#include <optional>
+#include <sstream>
+#include <string>
+#include <utility>
+
+#include "S3Processor.h"
+#include "core/PropertyDefinitionBuilder.h"
+#include "utils/ArrayUtils.h"
+#include "aws/kinesis/KinesisClient.h"
+
+
+namespace org::apache::nifi::minifi::aws::processors {
+
+class PutKinesisStream : public AwsProcessor {
+ public:
+  EXTENSIONAPI static constexpr const char* Description = "Sends the contents 
to a specified Amazon Kinesis. In order to send data to Kinesis, the stream 
name has to be specified.";
+
+  EXTENSIONAPI static constexpr auto AmazonKinesisStreamName = 
core::PropertyDefinitionBuilder<>::createProperty("Amazon Kinesis Stream Name")
+      .withDescription("The name of Kinesis Stream")
+      .isRequired(true)
+      .withValidator(core::StandardPropertyValidators::NON_BLANK_VALIDATOR)
+      .supportsExpressionLanguage(true)
+      .build();
+  EXTENSIONAPI static constexpr auto AmazonKinesisStreamPartitionKey = 
core::PropertyDefinitionBuilder<>::createProperty("Amazon Kinesis Stream 
Partition Key")
+      .withDescription("The partition key attribute. If it is not set, a 
random value is used")
+      .supportsExpressionLanguage(true)
+      .build();
+  EXTENSIONAPI static constexpr auto MessageBatchSize = 
core::PropertyDefinitionBuilder<>::createProperty("Batch Size")
+      .withDescription("Batch size for messages. [1-500]")
+      
.withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR)
+      .withDefaultValue("250")
+      .build();
+  EXTENSIONAPI static constexpr auto MaxBatchDataSize = 
core::PropertyDefinitionBuilder<>::createProperty("Max Batch Data Size")
+      .withDescription("Soft cap on the data size of the batch to a single 
stream. (max 4MB)")
+      .withValidator(core::StandardPropertyValidators::DATA_SIZE_VALIDATOR)
+      .withDefaultValue("1 MB")
+      .build();
+
+  EXTENSIONAPI static constexpr auto Properties = 
minifi::utils::array_cat(AwsProcessor::Properties, 
std::to_array<core::PropertyReference>({
+    AmazonKinesisStreamName, AmazonKinesisStreamPartitionKey, 
MessageBatchSize, MaxBatchDataSize
+  }));
+
+  EXTENSIONAPI static constexpr auto Success = 
core::RelationshipDefinition{"success", "FlowFiles are routed to success 
relationship"};
+  EXTENSIONAPI static constexpr auto Failure = 
core::RelationshipDefinition{"failure", "FlowFiles are routed to failure 
relationship"};
+  EXTENSIONAPI static constexpr auto Relationships = std::array{Success, 
Failure};
+
+  EXTENSIONAPI static constexpr auto AwsKinesisErrorMessage = 
core::OutputAttributeDefinition<>{"aws.kinesis.error.message", { Failure },
+    "Error message on posting message to AWS Kinesis"};
+  EXTENSIONAPI static constexpr auto AwsKinesisErrorCode = 
core::OutputAttributeDefinition<>{"aws.kinesis.error.code", { Failure },
+    "Error code for the message when posting to AWS Kinesis"};
+  EXTENSIONAPI static constexpr auto AwsKinesisSequenceNumber = 
core::OutputAttributeDefinition<>{"aws.kinesis.sequence.number", { Success },
+    "Sequence number for the message when posting to AWS Kinesis"};
+  EXTENSIONAPI static constexpr auto AwsKinesisShardId = 
core::OutputAttributeDefinition<>{"aws.kinesis.shard.id", { Success },
+    "Shard id of the message posted to AWS Kinesis"};
+  EXTENSIONAPI static constexpr auto OutputAttributes = 
std::array<core::OutputAttributeReference, 4>{AwsKinesisErrorMessage, 
AwsKinesisErrorCode, AwsKinesisSequenceNumber, AwsKinesisShardId};

Review Comment:
   we could use `to_array` here, too, like we do at `Properties`



##########
extensions/aws/processors/PutKinesisStream.cpp:
##########
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PutKinesisStream.h"
+
+#include <memory>
+#include <random>
+#include <unordered_map>
+
+#include "aws/kinesis/KinesisClient.h"
+#include "aws/kinesis/model/PutRecordsRequest.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/Resource.h"
+#include "utils/ProcessorConfigUtils.h"
+
+namespace org::apache::nifi::minifi::aws::processors {
+
+void PutKinesisStream::initialize() {
+  setSupportedProperties(Properties);
+  setSupportedRelationships(Relationships);
+}
+
+void PutKinesisStream::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory& session_factory) {
+  AwsProcessor::onSchedule(context, session_factory);
+
+  batch_size_ = parseU64Property(context, MessageBatchSize);
+  if (batch_size_ == 0 || batch_size_ > 500) {
+    logger_->log_warn("PutKinesisStream::MessageBatchSize is invalid. Setting 
it to the maximum 500 value.");
+    batch_size_ = 500;
+  }
+  batch_data_size_soft_cap_ = parseDataSizeProperty(context, MaxBatchDataSize);
+  if (batch_data_size_soft_cap_ > 4_MB) {
+    logger_->log_warn("PutKinesisStream::MaxMessageBufferSize is invalid. 
Setting it to the maximum 4 MB value.");
+    batch_data_size_soft_cap_ = 4_MB;
+  }
+
+  endpoint_override_url_ = context.getProperty(EndpointOverrideURL.name) | 
minifi::utils::toOptional();
+}
+
+struct StreamBatch {
+  uint64_t batch_size = 0;
+  std::vector<std::shared_ptr<core::FlowFile>> flow_files;
+};
+
+void PutKinesisStream::onTrigger(core::ProcessContext& context, 
core::ProcessSession& session) {
+  logger_->log_trace("PutKinesisStream onTrigger");
+
+  constexpr uint64_t SINGLE_RECORD_MAX_SIZE = 1_MB;
+  std::unordered_map<std::string, StreamBatch> stream_batches;
+  auto credentials = getAWSCredentials(context, nullptr);
+
+  if (!credentials) {
+    logger_->log_error("Failed to get credentials for PutKinesisStream");
+    context.yield();
+    return;
+  }
+
+  for (uint64_t i = 0; i < batch_size_; i++) {
+    std::shared_ptr<core::FlowFile> flow_file = session.get();
+    if (!flow_file) { break; }
+    const auto flow_file_size = flow_file->getSize();
+    if (flow_file_size > SINGLE_RECORD_MAX_SIZE) {
+      flow_file->setAttribute(AwsKinesisErrorMessage.name, fmt::format("record 
too big {}, max allowed {}", flow_file_size, SINGLE_RECORD_MAX_SIZE));
+      session.transfer(flow_file, Failure);
+      logger_->log_error("Failed to publish to kinesis record {} because the 
size was greater than {} bytes", flow_file->getUUID().to_string(), 
SINGLE_RECORD_MAX_SIZE);
+      continue;
+    }
+
+    auto stream_name = context.getProperty(AmazonKinesisStreamName.name, 
flow_file.get());
+    if (!stream_name) {
+      logger_->log_error("Stream name is invalid due to {}", 
stream_name.error().message());
+      session.transfer(flow_file, Failure);
+      continue;
+    }
+    auto partition_key = 
context.getProperty(AmazonKinesisStreamPartitionKey.name, flow_file.get())
+        | minifi::utils::valueOrElse([&flow_file]() -> std::string { return 
flow_file->getUUID().to_string(); });

Review Comment:
   this `partition_key` is unused, can be deleted



##########
extensions/aws/processors/PutKinesisStream.cpp:
##########
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PutKinesisStream.h"
+
+#include <memory>
+#include <random>
+#include <unordered_map>
+
+#include "aws/kinesis/KinesisClient.h"
+#include "aws/kinesis/model/PutRecordsRequest.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/Resource.h"
+#include "utils/ProcessorConfigUtils.h"
+
+namespace org::apache::nifi::minifi::aws::processors {
+
+void PutKinesisStream::initialize() {
+  setSupportedProperties(Properties);
+  setSupportedRelationships(Relationships);
+}
+
+void PutKinesisStream::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory& session_factory) {
+  AwsProcessor::onSchedule(context, session_factory);
+
+  batch_size_ = parseU64Property(context, MessageBatchSize);
+  if (batch_size_ == 0 || batch_size_ > 500) {
+    logger_->log_warn("PutKinesisStream::MessageBatchSize is invalid. Setting 
it to the maximum 500 value.");
+    batch_size_ = 500;
+  }
+  batch_data_size_soft_cap_ = parseDataSizeProperty(context, MaxBatchDataSize);
+  if (batch_data_size_soft_cap_ > 4_MB) {
+    logger_->log_warn("PutKinesisStream::MaxMessageBufferSize is invalid. 
Setting it to the maximum 4 MB value.");
+    batch_data_size_soft_cap_ = 4_MB;
+  }
+
+  endpoint_override_url_ = context.getProperty(EndpointOverrideURL.name) | 
minifi::utils::toOptional();
+}
+
+struct StreamBatch {
+  uint64_t batch_size = 0;
+  std::vector<std::shared_ptr<core::FlowFile>> flow_files;
+};
+
+void PutKinesisStream::onTrigger(core::ProcessContext& context, 
core::ProcessSession& session) {
+  logger_->log_trace("PutKinesisStream onTrigger");
+
+  constexpr uint64_t SINGLE_RECORD_MAX_SIZE = 1_MB;
+  std::unordered_map<std::string, StreamBatch> stream_batches;
+  auto credentials = getAWSCredentials(context, nullptr);
+
+  if (!credentials) {
+    logger_->log_error("Failed to get credentials for PutKinesisStream");
+    context.yield();
+    return;
+  }
+
+  for (uint64_t i = 0; i < batch_size_; i++) {
+    std::shared_ptr<core::FlowFile> flow_file = session.get();
+    if (!flow_file) { break; }
+    const auto flow_file_size = flow_file->getSize();
+    if (flow_file_size > SINGLE_RECORD_MAX_SIZE) {
+      flow_file->setAttribute(AwsKinesisErrorMessage.name, fmt::format("record 
too big {}, max allowed {}", flow_file_size, SINGLE_RECORD_MAX_SIZE));
+      session.transfer(flow_file, Failure);
+      logger_->log_error("Failed to publish to kinesis record {} because the 
size was greater than {} bytes", flow_file->getUUID().to_string(), 
SINGLE_RECORD_MAX_SIZE);
+      continue;
+    }
+
+    auto stream_name = context.getProperty(AmazonKinesisStreamName.name, 
flow_file.get());
+    if (!stream_name) {
+      logger_->log_error("Stream name is invalid due to {}", 
stream_name.error().message());
+      session.transfer(flow_file, Failure);
+      continue;
+    }
+    auto partition_key = 
context.getProperty(AmazonKinesisStreamPartitionKey.name, flow_file.get())
+        | minifi::utils::valueOrElse([&flow_file]() -> std::string { return 
flow_file->getUUID().to_string(); });
+
+    stream_batches[*stream_name].flow_files.push_back(std::move(flow_file));
+    stream_batches[*stream_name].batch_size += flow_file_size;
+
+    if (stream_batches[*stream_name].batch_size > batch_data_size_soft_cap_) {
+      break;
+    }
+  }
+
+  std::unique_ptr<Aws::Kinesis::KinesisClient> kinesis_client = 
getClient(*credentials);
+
+  for (const auto& [stream_name, stream_batch]: stream_batches) {
+    Aws::Kinesis::Model::PutRecordsRequest request;
+    request.SetStreamName(stream_name);
+    Aws::Vector<Aws::Kinesis::Model::PutRecordsRequestEntry> records;
+    for (const auto& flow_file : stream_batch.flow_files) {
+      Aws::Kinesis::Model::PutRecordsRequestEntry entry;
+      const auto partition_key = 
context.getProperty(AmazonKinesisStreamPartitionKey.name, flow_file.get()) | 
minifi::utils::valueOrElse([&flow_file] { return 
flow_file->getUUID().to_string(); });
+      entry.SetPartitionKey(partition_key);
+      const auto [status, buffer] = session.readBuffer(flow_file);

Review Comment:
   should we check that `io::isError(status)` is false?



##########
extensions/aws/processors/PutKinesisStream.cpp:
##########
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PutKinesisStream.h"
+
+#include <memory>
+#include <random>
+#include <unordered_map>
+
+#include "aws/kinesis/KinesisClient.h"
+#include "aws/kinesis/model/PutRecordsRequest.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/Resource.h"
+#include "utils/ProcessorConfigUtils.h"
+
+namespace org::apache::nifi::minifi::aws::processors {
+
+void PutKinesisStream::initialize() {
+  setSupportedProperties(Properties);
+  setSupportedRelationships(Relationships);
+}
+
+void PutKinesisStream::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory& session_factory) {
+  AwsProcessor::onSchedule(context, session_factory);
+
+  batch_size_ = parseU64Property(context, MessageBatchSize);
+  if (batch_size_ == 0 || batch_size_ > 500) {
+    logger_->log_warn("PutKinesisStream::MessageBatchSize is invalid. Setting 
it to the maximum 500 value.");
+    batch_size_ = 500;
+  }
+  batch_data_size_soft_cap_ = parseDataSizeProperty(context, MaxBatchDataSize);
+  if (batch_data_size_soft_cap_ > 4_MB) {
+    logger_->log_warn("PutKinesisStream::MaxMessageBufferSize is invalid. 
Setting it to the maximum 4 MB value.");
+    batch_data_size_soft_cap_ = 4_MB;
+  }
+
+  endpoint_override_url_ = context.getProperty(EndpointOverrideURL.name) | 
minifi::utils::toOptional();
+}
+
+struct StreamBatch {
+  uint64_t batch_size = 0;
+  std::vector<std::shared_ptr<core::FlowFile>> flow_files;
+};
+
+void PutKinesisStream::onTrigger(core::ProcessContext& context, 
core::ProcessSession& session) {
+  logger_->log_trace("PutKinesisStream onTrigger");
+
+  constexpr uint64_t SINGLE_RECORD_MAX_SIZE = 1_MB;
+  std::unordered_map<std::string, StreamBatch> stream_batches;
+  auto credentials = getAWSCredentials(context, nullptr);
+
+  if (!credentials) {
+    logger_->log_error("Failed to get credentials for PutKinesisStream");
+    context.yield();
+    return;
+  }
+
+  for (uint64_t i = 0; i < batch_size_; i++) {
+    std::shared_ptr<core::FlowFile> flow_file = session.get();
+    if (!flow_file) { break; }
+    const auto flow_file_size = flow_file->getSize();
+    if (flow_file_size > SINGLE_RECORD_MAX_SIZE) {
+      flow_file->setAttribute(AwsKinesisErrorMessage.name, fmt::format("record 
too big {}, max allowed {}", flow_file_size, SINGLE_RECORD_MAX_SIZE));
+      session.transfer(flow_file, Failure);
+      logger_->log_error("Failed to publish to kinesis record {} because the 
size was greater than {} bytes", flow_file->getUUID().to_string(), 
SINGLE_RECORD_MAX_SIZE);
+      continue;
+    }
+
+    auto stream_name = context.getProperty(AmazonKinesisStreamName.name, 
flow_file.get());
+    if (!stream_name) {
+      logger_->log_error("Stream name is invalid due to {}", 
stream_name.error().message());
+      session.transfer(flow_file, Failure);
+      continue;
+    }
+    auto partition_key = 
context.getProperty(AmazonKinesisStreamPartitionKey.name, flow_file.get())
+        | minifi::utils::valueOrElse([&flow_file]() -> std::string { return 
flow_file->getUUID().to_string(); });
+
+    stream_batches[*stream_name].flow_files.push_back(std::move(flow_file));
+    stream_batches[*stream_name].batch_size += flow_file_size;
+
+    if (stream_batches[*stream_name].batch_size > batch_data_size_soft_cap_) {
+      break;
+    }
+  }
+
+  std::unique_ptr<Aws::Kinesis::KinesisClient> kinesis_client = 
getClient(*credentials);
+
+  for (const auto& [stream_name, stream_batch]: stream_batches) {
+    Aws::Kinesis::Model::PutRecordsRequest request;
+    request.SetStreamName(stream_name);
+    Aws::Vector<Aws::Kinesis::Model::PutRecordsRequestEntry> records;
+    for (const auto& flow_file : stream_batch.flow_files) {
+      Aws::Kinesis::Model::PutRecordsRequestEntry entry;
+      const auto partition_key = 
context.getProperty(AmazonKinesisStreamPartitionKey.name, flow_file.get()) | 
minifi::utils::valueOrElse([&flow_file] { return 
flow_file->getUUID().to_string(); });
+      entry.SetPartitionKey(partition_key);
+      const auto [status, buffer] = session.readBuffer(flow_file);
+      Aws::Utils::ByteBuffer aws_buffer(reinterpret_cast<const unsigned 
char*>(buffer.data()), buffer.size());
+      entry.SetData(aws_buffer);
+      records.push_back(entry);
+    }
+    request.SetRecords(records);
+
+    const auto outcome = kinesis_client->PutRecords(request);
+
+    if (!outcome.IsSuccess()) {
+      for (const auto& flow_file : stream_batch.flow_files) {
+        flow_file->addAttribute(AwsKinesisErrorMessage.name, 
outcome.GetError().GetMessage());
+        flow_file->addAttribute(AwsKinesisErrorCode.name, 
std::to_string(static_cast<int>(outcome.GetError().GetErrorType())));
+        session.transfer(flow_file, Failure);
+      }
+    } else {
+      const auto result_records = outcome.GetResult().GetRecords();
+      if (result_records.size() != stream_batch.flow_files.size()) {
+        logger_->log_critical("PutKinesisStream record size mismatch cannot 
tell which record succeeded and which didnt");

Review Comment:
   logging the two sizes could help with troubleshooting; also the stream name 
could be useful



##########
extensions/aws/processors/PutKinesisStream.cpp:
##########
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PutKinesisStream.h"
+
+#include <memory>
+#include <random>
+#include <unordered_map>
+
+#include "aws/kinesis/KinesisClient.h"
+#include "aws/kinesis/model/PutRecordsRequest.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/Resource.h"
+#include "utils/ProcessorConfigUtils.h"
+
+namespace org::apache::nifi::minifi::aws::processors {
+
+void PutKinesisStream::initialize() {
+  setSupportedProperties(Properties);
+  setSupportedRelationships(Relationships);
+}
+
+void PutKinesisStream::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory& session_factory) {
+  AwsProcessor::onSchedule(context, session_factory);
+
+  batch_size_ = parseU64Property(context, MessageBatchSize);
+  if (batch_size_ == 0 || batch_size_ > 500) {
+    logger_->log_warn("PutKinesisStream::MessageBatchSize is invalid. Setting 
it to the maximum 500 value.");
+    batch_size_ = 500;
+  }
+  batch_data_size_soft_cap_ = parseDataSizeProperty(context, MaxBatchDataSize);
+  if (batch_data_size_soft_cap_ > 4_MB) {
+    logger_->log_warn("PutKinesisStream::MaxMessageBufferSize is invalid. 
Setting it to the maximum 4 MB value.");
+    batch_data_size_soft_cap_ = 4_MB;
+  }
+
+  endpoint_override_url_ = context.getProperty(EndpointOverrideURL.name) | 
minifi::utils::toOptional();
+}
+
+struct StreamBatch {
+  uint64_t batch_size = 0;
+  std::vector<std::shared_ptr<core::FlowFile>> flow_files;
+};
+
+void PutKinesisStream::onTrigger(core::ProcessContext& context, 
core::ProcessSession& session) {
+  logger_->log_trace("PutKinesisStream onTrigger");
+
+  constexpr uint64_t SINGLE_RECORD_MAX_SIZE = 1_MB;
+  std::unordered_map<std::string, StreamBatch> stream_batches;
+  auto credentials = getAWSCredentials(context, nullptr);
+
+  if (!credentials) {
+    logger_->log_error("Failed to get credentials for PutKinesisStream");
+    context.yield();
+    return;
+  }
+
+  for (uint64_t i = 0; i < batch_size_; i++) {
+    std::shared_ptr<core::FlowFile> flow_file = session.get();
+    if (!flow_file) { break; }
+    const auto flow_file_size = flow_file->getSize();
+    if (flow_file_size > SINGLE_RECORD_MAX_SIZE) {
+      flow_file->setAttribute(AwsKinesisErrorMessage.name, fmt::format("record 
too big {}, max allowed {}", flow_file_size, SINGLE_RECORD_MAX_SIZE));
+      session.transfer(flow_file, Failure);
+      logger_->log_error("Failed to publish to kinesis record {} because the 
size was greater than {} bytes", flow_file->getUUID().to_string(), 
SINGLE_RECORD_MAX_SIZE);
+      continue;
+    }
+
+    auto stream_name = context.getProperty(AmazonKinesisStreamName.name, 
flow_file.get());
+    if (!stream_name) {
+      logger_->log_error("Stream name is invalid due to {}", 
stream_name.error().message());
+      session.transfer(flow_file, Failure);
+      continue;
+    }

Review Comment:
   we could set the `aws.kinesis.error.message` attribute here, too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to