This is an automated email from the ASF dual-hosted git repository. fgerlits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 6611bea39ca116c328ea4242a46aae104a7409a3 Author: Martin Zink <[email protected]> AuthorDate: Tue Feb 4 12:52:22 2025 +0100 MINIFICPP-2518 PutKinesisStream Signed-off-by: Ferenc Gerlits <[email protected]> Closes #1927 --- PROCESSORS.md | 47 +++ README.md | 2 +- cmake/BundledAwsSdkCpp.cmake | 14 +- docker/test/integration/cluster/ContainerStore.py | 9 + .../test/integration/cluster/DockerTestCluster.py | 4 + docker/test/integration/cluster/ImageStore.py | 5 + .../integration/cluster/checkers/AwsChecker.py | 5 + .../cluster/containers/KinesisServerContainer.py | 41 +++ .../features/MiNiFi_integration_test_driver.py | 3 + docker/test/integration/features/kinesis.feature | 38 +++ docker/test/integration/features/steps/steps.py | 11 + .../minifi/processors/PutKinesisStream.py | 42 +++ .../integration/resources/kinesis-mock/Dockerfile | 8 + .../resources/kinesis-mock/consumer/consumer.js | 64 +++++ .../resources/kinesis-mock/consumer/package.json | 3 + .../integration/resources/kinesis-mock/server.json | 6 + extensions/aws/AWSCredentialsProvider.cpp | 2 - extensions/aws/AWSCredentialsProvider.h | 2 - extensions/aws/CMakeLists.txt | 10 +- .../{S3Processor.cpp => AwsProcessor.cpp} | 49 +--- .../processors/{S3Processor.h => AwsProcessor.h} | 35 +-- extensions/aws/processors/DeleteS3Object.cpp | 21 +- extensions/aws/processors/DeleteS3Object.h | 5 +- extensions/aws/processors/FetchS3Object.cpp | 19 +- extensions/aws/processors/FetchS3Object.h | 5 +- extensions/aws/processors/ListS3.cpp | 6 +- extensions/aws/processors/PutKinesisStream.cpp | 208 ++++++++++++++ extensions/aws/processors/PutKinesisStream.h | 139 +++++++++ extensions/aws/processors/PutS3Object.cpp | 35 ++- extensions/aws/processors/PutS3Object.h | 11 +- extensions/aws/processors/S3Processor.cpp | 113 +------- extensions/aws/processors/S3Processor.h | 155 +--------- extensions/aws/s3/S3ClientRequestSender.cpp | 2 - extensions/aws/s3/S3ClientRequestSender.h | 2 - extensions/aws/s3/S3RequestSender.h | 9 - extensions/aws/s3/S3Wrapper.cpp | 2 - extensions/aws/s3/S3Wrapper.h | 24 +- extensions/aws/tests/PutKinesisStreamTests.cpp | 318 +++++++++++++++++++++ extensions/aws/utils/AWSInitializer.cpp | 19 +- extensions/aws/utils/AWSInitializer.h | 2 - extensions/aws/utils/AWSSdkLogger.cpp | 2 - extensions/aws/utils/AWSSdkLogger.h | 2 - extensions/aws/utils/ProxyOptions.h | 33 +++ thirdparty/aws-sdk-cpp/dll-export-injection.patch | 185 ++++++++---- 44 files changed, 1252 insertions(+), 465 deletions(-) diff --git a/PROCESSORS.md b/PROCESSORS.md index 862a1182e..051e7377b 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -82,6 +82,7 @@ limitations under the License. - [PutCouchbaseKey](#PutCouchbaseKey) - [PutFile](#PutFile) - [PutGCSObject](#PutGCSObject) +- [PutKinesisStream](#PutKinesisStream) - [PutOPCProcessor](#PutOPCProcessor) - [PutS3Object](#PutS3Object) - [PutSFTP](#PutSFTP) @@ -2432,6 +2433,52 @@ In the list below, the names of required properties appear in bold. Any other pr | gcs.encryption.sha256 | success | The SHA256 hash of the key used to encrypt the object. | +## PutKinesisStream + +### Description + +Sends the contents to a specified Amazon Kinesis. In order to send data to Kinesis, the stream name has to be specified. + +### Properties + +In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. + +| Name | Default Value | Allowable Values [...] +|-------------------------------------|---------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...] +| Access Key | | [...] +| Secret Key | | [...] +| Credentials File | | [...] +| AWS Credentials Provider service | | [...] +| **Region** | us-west-2 | af-south-1<br/>ap-east-1<br/>ap-northeast-1<br/>ap-northeast-2<br/>ap-northeast-3<br/>ap-south-1<br/>ap-south-2<br/>ap-southeast-1<br/>ap-southeast-2<br/>ap-southeast-3<br/>ap-southeast-4<br/>ap-southeast-5<br/>ap-southeast-7<br/>ca-central-1<br/>ca-west-1<br/>cn-north-1<br/>cn-northwest-1<br/>eu-central-1<br/>eu-central-2<br/>eu-isoe-west-1<br/>eu-north-1<br/>eu-south-1<br/>eu-south-2<br/>eu-west-1<br/>eu-west-2<br/>eu-west-3<br/>i [...] +| **Communications Timeout** | 30 sec | [...] +| Endpoint Override URL | | [...] +| Proxy Host | | [...] +| Proxy Port | | [...] +| Proxy Username | | [...] +| Proxy Password | | [...] +| **Use Default Credentials** | false | true<br/>false [...] +| **Amazon Kinesis Stream Name** | | [...] +| Amazon Kinesis Stream Partition Key | | [...] +| Batch Size | 250 | [...] +| Max Batch Data Size | 1 MB | [...] + +### Relationships + +| Name | Description | +|---------|----------------------------------------------| +| success | FlowFiles are routed to success relationship | +| failure | FlowFiles are routed to failure relationship | + +### Output Attributes + +| Attribute | Relationship | Description | +|-----------------------------|--------------|-------------------------------------------------------------| +| aws.kinesis.error.message | failure | Error message on posting message to AWS Kinesis | +| aws.kinesis.error.code | failure | Error code for the message when posting to AWS Kinesis | +| aws.kinesis.sequence.number | success | Sequence number for the message when posting to AWS Kinesis | +| aws.kinesis.shard.id | success | Shard id of the message posted to AWS Kinesis | + + ## PutOPCProcessor ### Description diff --git a/README.md b/README.md index 790238afa..5e7102866 100644 --- a/README.md +++ b/README.md @@ -73,7 +73,7 @@ The next table outlines CMAKE flags that correspond with MiNiFi extensions. Exte | Extension Set | Processors and Controller Services [...] |----------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...] | Archive Extensions | [ApplyTemplate](PROCESSORS.md#applytemplate)<br/>[BinFiles](PROCESSORS.md#binfiles)<br/>[CompressContent](PROCESSORS.md#compresscontent)<br/>[ManipulateArchive](PROCESSORS.md#manipulatearchive)<br/>[MergeContent](PROCESSORS.md#mergecontent)<br/>[FocusArchiveEntry](PROCESSORS.md#focusarchiveentry)<br/>[UnfocusArchiveEntry](PROCESSORS.md#unfocusarchiveentry) [...] -| AWS | [AWSCredentialsService](CONTROLLERS.md#awscredentialsservice)<br/>[PutS3Object](PROCESSORS.md#puts3object)<br/>[DeleteS3Object](PROCESSORS.md#deletes3object)<br/>[FetchS3Object](PROCESSORS.md#fetchs3object)<br/>[ListS3](PROCESSORS.md#lists3) [...] +| AWS | [AWSCredentialsService](CONTROLLERS.md#awscredentialsservice)<br/>[PutS3Object](PROCESSORS.md#puts3object)<br/>[DeleteS3Object](PROCESSORS.md#deletes3object)<br/>[FetchS3Object](PROCESSORS.md#fetchs3object)<br/>[ListS3](PROCESSORS.md#lists3)<br/>[PutKinesisStream](PROCESSORS.md#putkinesisstream) [...] | Azure | [AzureStorageCredentialsService](CONTROLLERS.md#azurestoragecredentialsservice)<br/>[PutAzureBlobStorage](PROCESSORS.md#putazureblobstorage)<br/>[DeleteAzureBlobStorage](PROCESSORS.md#deleteazureblobstorage)<br/>[FetchAzureBlobStorage](PROCESSORS.md#fetchazureblobstorage)<br/>[ListAzureBlobStorage](PROCESSORS.md#listazureblobstorage)<br/>[PutAzureDataLakeStorage](PROCESSORS.md#putazuredatalakestorage)<br/>[DeleteAzureDataLakeStorage](PROCESSORS.md#del [...] | CivetWeb | [ListenHTTP](PROCESSORS.md#listenhttp) [...] | Couchbase | [CouchbaseClusterService](CONTROLLERS.md#couchbaseclusterservice)<br/>[PutCouchbaseKey](PROCESSORS.md#putcouchbasekey)<br/>[GetCouchbaseKey](PROCESSORS.md#getcouchbasekey) [...] diff --git a/cmake/BundledAwsSdkCpp.cmake b/cmake/BundledAwsSdkCpp.cmake index b223098ac..672f73d7b 100644 --- a/cmake/BundledAwsSdkCpp.cmake +++ b/cmake/BundledAwsSdkCpp.cmake @@ -60,7 +60,9 @@ function(use_bundled_libaws SOURCE_DIR BINARY_DIR) "${LIBDIR}/${PREFIX}aws-c-compression.${SUFFIX}" "${LIBDIR}/${PREFIX}aws-c-sdkutils.${SUFFIX}" "${LIBDIR}/${PREFIX}aws-cpp-sdk-core.${SUFFIX}" - "${LIBDIR}/${PREFIX}aws-cpp-sdk-s3.${SUFFIX}") + "${LIBDIR}/${PREFIX}aws-cpp-sdk-s3.${SUFFIX}" + "${LIBDIR}/${PREFIX}aws-cpp-sdk-kinesis.${SUFFIX}" + ) FOREACH(BYPRODUCT ${BYPRODUCTS}) LIST(APPEND AWSSDK_LIBRARIES_LIST "${BINARY_DIR}/thirdparty/libaws-install/${BYPRODUCT}") @@ -69,13 +71,13 @@ function(use_bundled_libaws SOURCE_DIR BINARY_DIR) set(AWS_SDK_CPP_CMAKE_ARGS ${PASSTHROUGH_CMAKE_ARGS} -DCMAKE_PREFIX_PATH=${BINARY_DIR}/thirdparty/libaws-install -DCMAKE_INSTALL_PREFIX=${BINARY_DIR}/thirdparty/libaws-install - -DBUILD_ONLY=s3 + -DBUILD_ONLY=kinesis%s3 -DENABLE_TESTING=OFF -DBUILD_SHARED_LIBS=OFF -DENABLE_UNITY_BUILD=${AWS_ENABLE_UNITY_BUILD}) if(WIN32) - list(APPEND AWS_SDK_CPP_CMAKE_ARGS -DFORCE_EXPORT_CORE_API=ON -DFORCE_EXPORT_S3_API=ON) + list(APPEND AWS_SDK_CPP_CMAKE_ARGS -DFORCE_EXPORT_CORE_API=ON -DFORCE_EXPORT_S3_API=ON -DFORCE_EXPORT_KINESIS_API=ON) endif() append_third_party_passthrough_args(AWS_SDK_CPP_CMAKE_ARGS "${AWS_SDK_CPP_CMAKE_ARGS}") @@ -209,4 +211,10 @@ function(use_bundled_libaws SOURCE_DIR BINARY_DIR) add_dependencies(AWS::aws-cpp-sdk-s3 aws-sdk-cpp-external) target_include_directories(AWS::aws-cpp-sdk-s3 INTERFACE ${LIBAWS_INCLUDE_DIR}) target_link_libraries(AWS::aws-cpp-sdk-s3 INTERFACE AWS::aws-cpp-sdk-core) + + add_library(AWS::aws-cpp-sdk-kinesis STATIC IMPORTED) + set_target_properties(AWS::aws-cpp-sdk-kinesis PROPERTIES IMPORTED_LOCATION "${BINARY_DIR}/thirdparty/libaws-install/${LIBDIR}/${PREFIX}aws-cpp-sdk-kinesis.${SUFFIX}") + add_dependencies(AWS::aws-cpp-sdk-kinesis aws-sdk-cpp-external) + target_include_directories(AWS::aws-cpp-sdk-kinesis INTERFACE ${LIBAWS_INCLUDE_DIR}) + target_link_libraries(AWS::aws-cpp-sdk-kinesis INTERFACE AWS::aws-cpp-sdk-core) endfunction(use_bundled_libaws) diff --git a/docker/test/integration/cluster/ContainerStore.py b/docker/test/integration/cluster/ContainerStore.py index 79f88fb89..b73a6ac8e 100644 --- a/docker/test/integration/cluster/ContainerStore.py +++ b/docker/test/integration/cluster/ContainerStore.py @@ -20,6 +20,7 @@ from .containers.NifiContainer import NifiContainer from .containers.NifiContainer import NiFiOptions from .containers.ZookeeperContainer import ZookeeperContainer from .containers.KafkaBrokerContainer import KafkaBrokerContainer +from .containers.KinesisServerContainer import KinesisServerContainer from .containers.S3ServerContainer import S3ServerContainer from .containers.AzureStorageServerContainer import AzureStorageServerContainer from .containers.FakeGcsServerContainer import FakeGcsServerContainer @@ -153,6 +154,14 @@ class ContainerStore: network=self.network, image_store=self.image_store, command=command)) + elif engine == 'kinesis-server': + return self.containers.setdefault(container_name, + KinesisServerContainer(feature_context=feature_context, + name=container_name, + vols=self.vols, + network=self.network, + image_store=self.image_store, + command=command)) elif engine == 'azure-storage-server': return self.containers.setdefault(container_name, AzureStorageServerContainer(feature_context=feature_context, diff --git a/docker/test/integration/cluster/DockerTestCluster.py b/docker/test/integration/cluster/DockerTestCluster.py index faa42b727..6d39022e1 100644 --- a/docker/test/integration/cluster/DockerTestCluster.py +++ b/docker/test/integration/cluster/DockerTestCluster.py @@ -202,6 +202,10 @@ class DockerTestCluster: and output.count("TCP_MISS") >= output.count("TCP_DENIED")) or output.count("TCP_DENIED") == 0 and "TCP_MISS" in output) + def check_kinesis_server_record_data(self, container_name, record_data): + container_name = self.container_store.get_container_name_with_postfix(container_name) + return self.aws_checker.check_kinesis_server_record_data(container_name, record_data) + def check_s3_server_object_data(self, container_name, test_data): container_name = self.container_store.get_container_name_with_postfix(container_name) return self.aws_checker.check_s3_server_object_data(container_name, test_data) diff --git a/docker/test/integration/cluster/ImageStore.py b/docker/test/integration/cluster/ImageStore.py index 100b1c22b..723591e4a 100644 --- a/docker/test/integration/cluster/ImageStore.py +++ b/docker/test/integration/cluster/ImageStore.py @@ -67,6 +67,8 @@ class ImageStore: image = self.__build_mqtt_broker_image() elif container_engine == "splunk": image = self.__build_splunk_image() + elif container_engine == "kinesis-server": + image = self.__build_kinesis_image() elif container_engine == "reverse-proxy": image = self.__build_reverse_proxy_image() elif container_engine == "diag-slave-tcp": @@ -276,6 +278,9 @@ class ImageStore: return self.__build_image(dockerfile) + def __build_kinesis_image(self): + return self.__build_image_by_path(self.test_dir + "/resources/kinesis-mock", 'kinesis-server') + def __build_splunk_image(self): return self.__build_image_by_path(self.test_dir + "/resources/splunk-hec", 'minifi-splunk') diff --git a/docker/test/integration/cluster/checkers/AwsChecker.py b/docker/test/integration/cluster/checkers/AwsChecker.py index 7a5e6ca71..c9e4ea057 100644 --- a/docker/test/integration/cluster/checkers/AwsChecker.py +++ b/docker/test/integration/cluster/checkers/AwsChecker.py @@ -20,6 +20,11 @@ class AwsChecker: def __init__(self, container_communicator): self.container_communicator = container_communicator + @retry_check() + def check_kinesis_server_record_data(self, container_name, record_data): + (code, output) = self.container_communicator.execute_command(container_name, ["node", "/app/consumer/consumer.js", record_data]) + return code == 0 + @retry_check() def check_s3_server_object_data(self, container_name, test_data): (code, output) = self.container_communicator.execute_command(container_name, ["find", "/s3mockroot/test_bucket", "-mindepth", "1", "-maxdepth", "1", "-type", "d"]) diff --git a/docker/test/integration/cluster/containers/KinesisServerContainer.py b/docker/test/integration/cluster/containers/KinesisServerContainer.py new file mode 100644 index 000000000..0bee46cef --- /dev/null +++ b/docker/test/integration/cluster/containers/KinesisServerContainer.py @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import logging +from .Container import Container + + +class KinesisServerContainer(Container): + def __init__(self, feature_context, name, vols, network, image_store, command): + super().__init__(feature_context, name, 'kinesis-server', vols, network, image_store, command) + + def get_startup_finished_log_entry(self): + return "Starting Kinesis Plain Mock Service on port 4568" + + def deploy(self): + if not self.set_deployed(): + return + + logging.info('Creating and running kinesis server docker container...') + self.client.containers.run( + self.image_store.get_image(self.get_engine()), + detach=True, + name=self.name, + network=self.network.name, + environment=["INITIALIZE_STREAMS=test_stream:3", + "LOG_LEVEL=DEBUG"], + entrypoint=self.command) + logging.info('Added container \'%s\'', self.name) diff --git a/docker/test/integration/features/MiNiFi_integration_test_driver.py b/docker/test/integration/features/MiNiFi_integration_test_driver.py index 71a280f7e..8f2747714 100644 --- a/docker/test/integration/features/MiNiFi_integration_test_driver.py +++ b/docker/test/integration/features/MiNiFi_integration_test_driver.py @@ -251,6 +251,9 @@ class MiNiFi_integration_test: assert not self.cluster.segfault_happened() or self.cluster.log_app_output() assert validator.validate() or self.cluster.log_app_output() + def check_kinesis_server_record_data(self, kinesis_container_name, record_data): + assert self.cluster.check_kinesis_server_record_data(kinesis_container_name, record_data) or self.cluster.log_app_output() + def check_s3_server_object_data(self, s3_container_name, object_data): assert self.cluster.check_s3_server_object_data(s3_container_name, object_data) or self.cluster.log_app_output() diff --git a/docker/test/integration/features/kinesis.feature b/docker/test/integration/features/kinesis.feature new file mode 100644 index 000000000..2f68fd77c --- /dev/null +++ b/docker/test/integration/features/kinesis.feature @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +@ENABLE_AWS +Feature: Sending data from MiNiFi-C++ to an AWS Kinesis server + In order to transfer data to interact with AWS Kinesis server + As a user of MiNiFi + I need to have PutKinesisStream processor + + Background: + Given the content of "/tmp/output" is monitored + + Scenario: A MiNiFi instance can send data to AWS Kinesis + Given a GetFile processor with the "Input Directory" property set to "/tmp/input" + And a file with the content "Schnappi, das kleine Krokodil" is present in "/tmp/input" + And a PutKinesisStream processor set up to communicate with the kinesis server + And a PutFile processor with the "Directory" property set to "/tmp/output" + And the "success" relationship of the GetFile processor is connected to the PutKinesisStream + And the "success" relationship of the PutKinesisStream processor is connected to the PutFile + + And a kinesis server is set up in correspondence with the PutKinesisStream + + When both instances start up + + Then a flowfile with the content "Schnappi, das kleine Krokodil" is placed in the monitored directory in less than 60 seconds + And there is a record on the kinesis server with "Schnappi, das kleine Krokodil" diff --git a/docker/test/integration/features/steps/steps.py b/docker/test/integration/features/steps/steps.py index 46e4e81ca..9c3bf66f6 100644 --- a/docker/test/integration/features/steps/steps.py +++ b/docker/test/integration/features/steps/steps.py @@ -128,6 +128,7 @@ def step_impl(context, processor_type, minifi_container_name): @given("a {processor_type} processor set up to communicate with a kafka broker instance") @given("a {processor_type} processor set up to communicate with an MQTT broker instance") @given("a {processor_type} processor set up to communicate with the Splunk HEC instance") +@given("a {processor_type} processor set up to communicate with the kinesis server") def step_impl(context, processor_type): __create_processor(context, processor_type, processor_type, None, None, "minifi-cpp-flow") @@ -473,6 +474,11 @@ def step_impl(context): context.test.acquire_container(context=context, name="s3-server", engine="s3-server") +@given("a kinesis server is set up in correspondence with the PutKinesisStream") +def step_impl(context): + context.test.acquire_container(context=context, name="kinesis-server", engine="kinesis-server") + + # azure storage setup @given("an Azure storage server is set up") def step_impl(context): @@ -910,6 +916,11 @@ def step_impl(context, url): context.test.check_http_proxy_access('http-proxy', url) +@then("there is a record on the kinesis server with \"{record_data}\"") +def step_impl(context, record_data): + context.test.check_kinesis_server_record_data("kinesis-server", record_data) + + @then("the object on the s3 server is \"{object_data}\"") def step_impl(context, object_data): context.test.check_s3_server_object_data("s3-server", object_data) diff --git a/docker/test/integration/minifi/processors/PutKinesisStream.py b/docker/test/integration/minifi/processors/PutKinesisStream.py new file mode 100644 index 000000000..40e55e1c2 --- /dev/null +++ b/docker/test/integration/minifi/processors/PutKinesisStream.py @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ..core.Processor import Processor + + +class PutKinesisStream(Processor): + def __init__( + self, + context, + proxy_host='', + proxy_port='', + proxy_username='', + proxy_password=''): + super(PutKinesisStream, self).__init__( + context=context, + clazz='PutKinesisStream', + properties={ + 'Amazon Kinesis Stream Name': 'test_stream', + 'Access Key': 'test_access_key', + 'Secret Key': 'test_secret', + 'Endpoint Override URL': f"http://kinesis-server-{context.feature_id}:4568", + 'Proxy Host': proxy_host, + 'Proxy Port': proxy_port, + 'Proxy Username': proxy_username, + 'Proxy Password': proxy_password, + 'Region': 'us-east-1' + }, + auto_terminate=["success", "failure"]) diff --git a/docker/test/integration/resources/kinesis-mock/Dockerfile b/docker/test/integration/resources/kinesis-mock/Dockerfile new file mode 100644 index 000000000..567acfb23 --- /dev/null +++ b/docker/test/integration/resources/kinesis-mock/Dockerfile @@ -0,0 +1,8 @@ +FROM node:lts-alpine +WORKDIR /app +RUN npm i kinesis-local +RUN npm i @aws-sdk/client-kinesis @aws-sdk/node-http-handler +COPY server.json ./ +COPY consumer ./consumer + +ENTRYPOINT npx kinesis-local diff --git a/docker/test/integration/resources/kinesis-mock/consumer/consumer.js b/docker/test/integration/resources/kinesis-mock/consumer/consumer.js new file mode 100644 index 000000000..07f48cb27 --- /dev/null +++ b/docker/test/integration/resources/kinesis-mock/consumer/consumer.js @@ -0,0 +1,64 @@ +import { KinesisClient, DescribeStreamCommand, GetShardIteratorCommand, GetRecordsCommand } from "@aws-sdk/client-kinesis"; +import { NodeHttpHandler } from "@aws-sdk/node-http-handler"; + +const args = process.argv.slice(2); +if (args.length === 0) { + console.error("Usage: node consumer.js <search-string>"); + process.exit(1); +} + +const searchString = args[0]; +const streamName = "test_stream"; + +const kinesis = new KinesisClient({ + endpoint: "http://localhost:4568", + region: "us-east-1", + credentials: { accessKeyId: "fake", secretAccessKey: "fake" }, + requestHandler: new NodeHttpHandler({ http2: false }), +}); + +async function getShardIterator(shardId) { + const command = new GetShardIteratorCommand({ + StreamName: streamName, + ShardId: shardId, + ShardIteratorType: "TRIM_HORIZON", + }); + const response = await kinesis.send(command); + return response.ShardIterator; +} + +async function readShardRecords(shardId) { + let shardIterator = await getShardIterator(shardId); + const getRecordsCommand = new GetRecordsCommand({ ShardIterator: shardIterator }); + const data = await kinesis.send(getRecordsCommand); + + return data.Records.map(r => Buffer.from(r.Data).toString().trim()); +} + +async function readOnce() { + try { + console.log(`Checking stream '${streamName}' for: "${searchString}"`); + + const describeCommand = new DescribeStreamCommand({ StreamName: streamName }); + const { StreamDescription } = await kinesis.send(describeCommand); + + for (let shard of StreamDescription.Shards) { + console.log(`Reading from shard: ${shard.ShardId}`); + + const records = await readShardRecords(shard.ShardId); + + if (records.includes(searchString)) { + console.log(`Found "${searchString}" in records.`); + process.exit(0); + } + } + + console.log(`"${searchString}" not found in any shard.`); + process.exit(-1); + } catch (error) { + console.error("Error reading stream:", error); + process.exit(1); + } +} + +readOnce(); diff --git a/docker/test/integration/resources/kinesis-mock/consumer/package.json b/docker/test/integration/resources/kinesis-mock/consumer/package.json new file mode 100644 index 000000000..aead43de3 --- /dev/null +++ b/docker/test/integration/resources/kinesis-mock/consumer/package.json @@ -0,0 +1,3 @@ +{ + "type": "module" +} \ No newline at end of file diff --git a/docker/test/integration/resources/kinesis-mock/server.json b/docker/test/integration/resources/kinesis-mock/server.json new file mode 100644 index 000000000..cce12612b --- /dev/null +++ b/docker/test/integration/resources/kinesis-mock/server.json @@ -0,0 +1,6 @@ +{ + "server": { + "key": "-----BEGIN PRIVATE KEY-----\nMIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDeEXM0RFWMOxkB\neBP3YcJhYhiouRvtHBnMj0M79Umst1OlfB2qqZ9P2nlVWN9E+vzbbbvtJBir/bkJ\nWMnh3sj86nY5EYUlaAwi2oQMVkivHFfweK/Usspy8bAyKBwM5BNJhnAX7GdR6cXu\nv7S2fUjGbp6bkATCypROrBC4HmOSM+GabZRQA6/EiKsYa53NxmBVgrm7twO3eoeS\ntVzH15/orWJm/8ukxq//E+WI4qb0LiRT79FjWoRK2czpAzP6+JqdQTSg1msCbsod\n/+k7nc5CVMStnkKMBOk1jniAfVoqDAtGJRlIMpWXtr6vQFiZE4z6t3bl2ZY/h6p6\nqmTkL1gXAgMBAAECggEBAMJ6v8zvZ4hXHVAnDD1jlStaEMR60NU3/fQ [...] + "cert": "-----BEGIN CERTIFICATE-----\nMIIDfTCCAmWgAwIBAgIEW6AYvzANBgkqhkiG9w0BAQsFADBuMRAwDgYDVQQGEwdV\nbmtub3duMRAwDgYDVQQIEwdVbmtub3duMRAwDgYDVQQHEwdVbmtub3duMRAwDgYD\nVQQKEwdVbmtub3duMRIwEAYDVQQLEwlGUzIgVGVzdHMxEDAOBgNVBAMTB1Vua25v\nd24wIBcNMTkxMjAyMTMyOTM3WhgPMjExODA2MjYxMzI5MzdaMG4xEDAOBgNVBAYT\nB1Vua25vd24xEDAOBgNVBAgTB1Vua25vd24xEDAOBgNVBAcTB1Vua25vd24xEDAO\nBgNVBAoTB1Vua25vd24xEjAQBgNVBAsTCUZTMiBUZXN0czEQMA4GA1UEAxMHVW5r\nbm93bjCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAN4Rcz [...] + } +} diff --git a/extensions/aws/AWSCredentialsProvider.cpp b/extensions/aws/AWSCredentialsProvider.cpp index a4ed3c42e..11c82d352 100644 --- a/extensions/aws/AWSCredentialsProvider.cpp +++ b/extensions/aws/AWSCredentialsProvider.cpp @@ -1,6 +1,4 @@ /** - * @file AWSCredentialsProvider.cpp - * AWSCredentialsProvider class implementation * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with diff --git a/extensions/aws/AWSCredentialsProvider.h b/extensions/aws/AWSCredentialsProvider.h index 664f84709..701b6a533 100644 --- a/extensions/aws/AWSCredentialsProvider.h +++ b/extensions/aws/AWSCredentialsProvider.h @@ -1,6 +1,4 @@ /** - * @file AWSCredentialsProvider.h - * AWSCredentialsProvider class implementation * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with diff --git a/extensions/aws/CMakeLists.txt b/extensions/aws/CMakeLists.txt index 11ae83543..a2bc0cd44 100644 --- a/extensions/aws/CMakeLists.txt +++ b/extensions/aws/CMakeLists.txt @@ -33,14 +33,20 @@ add_minifi_library(minifi-aws SHARED ${SOURCES}) target_link_libraries(minifi-aws PUBLIC ${LIBMINIFI} Threads::Threads) target_wholearchive_library_private(minifi-aws AWS::aws-cpp-sdk-s3) +target_wholearchive_library_private(minifi-aws AWS::aws-cpp-sdk-kinesis) if(CMAKE_SYSTEM_PROCESSOR MATCHES "(arm64)|(ARM64)|(aarch64)|(armv8)") target_wholearchive_library_private(minifi-aws AWS::aws-checksums) endif() -get_target_property(AWS_SDK_INCLUDE_DIRS AWS::aws-cpp-sdk-s3 INTERFACE_INCLUDE_DIRECTORIES) -target_include_directories(minifi-aws INTERFACE ${AWS_SDK_INCLUDE_DIRS}) +get_target_property(AWS_SDK_S3_INCLUDE_DIRS AWS::aws-cpp-sdk-s3 INTERFACE_INCLUDE_DIRECTORIES) +get_target_property(AWS_SDK_KINESIS_INCLUDE_DIRS AWS::aws-cpp-sdk-kinesis INTERFACE_INCLUDE_DIRECTORIES) + +target_include_directories(minifi-aws INTERFACE ${AWS_SDK_S3_INCLUDE_DIRS}) +target_include_directories(minifi-aws INTERFACE ${AWS_SDK_KINESIS_INCLUDE_DIRS}) + if(WIN32) target_compile_definitions(minifi-aws INTERFACE "AWS_CORE_API=__declspec(dllimport)") target_compile_definitions(minifi-aws INTERFACE "AWS_S3_API=__declspec(dllimport)") + target_compile_definitions(minifi-aws INTERFACE "AWS_KINESIS_API=__declspec(dllimport)") endif() register_extension(minifi-aws "AWS EXTENSIONS" AWS-EXTENSIONS "This enables AWS support" "extensions/aws/tests") diff --git a/extensions/aws/processors/S3Processor.cpp b/extensions/aws/processors/AwsProcessor.cpp similarity index 69% copy from extensions/aws/processors/S3Processor.cpp copy to extensions/aws/processors/AwsProcessor.cpp index 50351cec1..4ff850ee0 100644 --- a/extensions/aws/processors/S3Processor.cpp +++ b/extensions/aws/processors/AwsProcessor.cpp @@ -15,13 +15,13 @@ * limitations under the License. */ -#include "S3Processor.h" +#include "AwsProcessor.h" #include <memory> #include <string> #include <utility> -#include "AWSCredentialsService.h" +#include "controllerservices/AWSCredentialsService.h" #include "S3Wrapper.h" #include "core/ProcessContext.h" #include "properties/Properties.h" @@ -32,18 +32,12 @@ namespace org::apache::nifi::minifi::aws::processors { -S3Processor::S3Processor(std::string_view name, const minifi::utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger) +AwsProcessor::AwsProcessor(std::string_view name, const minifi::utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger) : core::ProcessorImpl(name, uuid), logger_(std::move(logger)) { } -S3Processor::S3Processor(std::string_view name, const minifi::utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger, std::unique_ptr<aws::s3::S3RequestSender> s3_request_sender) - : core::ProcessorImpl(name, uuid), - logger_(std::move(logger)), - s3_wrapper_(std::move(s3_request_sender)) { -} - -std::optional<Aws::Auth::AWSCredentials> S3Processor::getAWSCredentialsFromControllerService(core::ProcessContext& context) const { +std::optional<Aws::Auth::AWSCredentials> AwsProcessor::getAWSCredentialsFromControllerService(core::ProcessContext& context) const { if (const auto aws_credentials_service = minifi::utils::parseOptionalControllerService<controllers::AWSCredentialsService>(context, AWSCredentialsProviderService, getUUID())) { return (*aws_credentials_service)->getAWSCredentials(); } @@ -52,7 +46,7 @@ std::optional<Aws::Auth::AWSCredentials> S3Processor::getAWSCredentialsFromContr return std::nullopt; } -std::optional<Aws::Auth::AWSCredentials> S3Processor::getAWSCredentials( +std::optional<Aws::Auth::AWSCredentials> AwsProcessor::getAWSCredentials( core::ProcessContext& context, const core::FlowFile* const flow_file) { auto service_cred = getAWSCredentialsFromControllerService(context); @@ -78,8 +72,8 @@ std::optional<Aws::Auth::AWSCredentials> S3Processor::getAWSCredentials( return aws_credentials_provider.getAWSCredentials(); } -std::optional<aws::s3::ProxyOptions> S3Processor::getProxy(core::ProcessContext& context, const core::FlowFile* const flow_file) { - aws::s3::ProxyOptions proxy; +aws::ProxyOptions AwsProcessor::getProxy(core::ProcessContext& context, const core::FlowFile* const flow_file) { + aws::ProxyOptions proxy; proxy.host = minifi::utils::parseOptionalProperty(context, ProxyHost, flow_file).value_or(""); proxy.port = gsl::narrow<uint32_t>(minifi::utils::parseOptionalU64Property(context, ProxyPort, flow_file).value_or(0)); @@ -87,22 +81,19 @@ std::optional<aws::s3::ProxyOptions> S3Processor::getProxy(core::ProcessContext& proxy.password = minifi::utils::parseOptionalProperty(context, ProxyPassword, flow_file).value_or(""); if (!proxy.host.empty()) { - logger_->log_info("Proxy for S3Processor was set."); + logger_->log_info("Proxy for AwsProcessor was set."); } return proxy; } -void S3Processor::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { +void AwsProcessor::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { client_config_ = Aws::Client::ClientConfiguration(); - if (!getProperty(Bucket.name)) { - throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Bucket property missing or invalid"); - } client_config_->region = context.getProperty(Region) | minifi::utils::orThrow("Region property missing or invalid"); - logger_->log_debug("S3Processor: Region [{}]", client_config_->region); + logger_->log_debug("AwsProcessor: Region [{}]", client_config_->region); if (auto communications_timeout = minifi::utils::parseOptionalDurationProperty(context, CommunicationsTimeout)) { - logger_->log_debug("S3Processor: Communications Timeout {}", *communications_timeout); + logger_->log_debug("AwsProcessor: Communications Timeout {}", *communications_timeout); client_config_->connectTimeoutMs = gsl::narrow<long>(communications_timeout->count()); // NOLINT(runtime/int,google-runtime-int) } else { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Communications Timeout missing or invalid"); @@ -114,17 +105,10 @@ void S3Processor::onSchedule(core::ProcessContext& context, core::ProcessSession } } -std::optional<CommonProperties> S3Processor::getCommonELSupportedProperties( +std::optional<CommonProperties> AwsProcessor::getCommonELSupportedProperties( core::ProcessContext& context, const core::FlowFile* const flow_file) { CommonProperties properties; - if (auto bucket = context.getProperty(Bucket, flow_file); !bucket || bucket->empty()) { - logger_->log_error("Bucket '{}' is invalid or empty!", properties.bucket); - return std::nullopt; - } else { - properties.bucket = *bucket; - } - logger_->log_debug("S3Processor: Bucket [{}]", properties.bucket); auto credentials = getAWSCredentials(context, flow_file); if (!credentials) { @@ -132,17 +116,12 @@ std::optional<CommonProperties> S3Processor::getCommonELSupportedProperties( return std::nullopt; } properties.credentials = credentials.value(); - - auto proxy = getProxy(context, flow_file); - if (!proxy) { - return std::nullopt; - } - properties.proxy = proxy.value(); + properties.proxy = getProxy(context, flow_file); const auto endpoint_override_url = context.getProperty(EndpointOverrideURL, flow_file); if (endpoint_override_url) { properties.endpoint_override_url = *endpoint_override_url; - logger_->log_debug("S3Processor: Endpoint Override URL [{}]", properties.endpoint_override_url); + logger_->log_debug("AwsProcessor: Endpoint Override URL [{}]", properties.endpoint_override_url); } return properties; diff --git a/extensions/aws/processors/S3Processor.h b/extensions/aws/processors/AwsProcessor.h similarity index 88% copy from extensions/aws/processors/S3Processor.h copy to extensions/aws/processors/AwsProcessor.h index fe628e27c..9f0055e2e 100644 --- a/extensions/aws/processors/S3Processor.h +++ b/extensions/aws/processors/AwsProcessor.h @@ -1,6 +1,4 @@ /** - * @file S3Processor.h - * Base S3 processor class declaration * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -29,16 +27,13 @@ #include <utility> #include "aws/core/auth/AWSCredentialsProvider.h" -#include "S3Wrapper.h" #include "AWSCredentialsProvider.h" -#include "core/Property.h" +#include "utils/ProxyOptions.h" #include "core/PropertyDefinition.h" #include "core/PropertyDefinitionBuilder.h" #include "minifi-cpp/core/PropertyValidator.h" #include "core/Processor.h" -#include "core/logging/Logger.h" -#include "core/logging/LoggerFactory.h" -#include "utils/OptionalUtils.h" + namespace org::apache::nifi::minifi::aws::processors { @@ -94,21 +89,13 @@ inline constexpr auto REGIONS = std::array{ } // namespace region struct CommonProperties { - std::string bucket; - std::string object_key; Aws::Auth::AWSCredentials credentials; - aws::s3::ProxyOptions proxy; + aws::ProxyOptions proxy; std::string endpoint_override_url; }; -class S3Processor : public core::ProcessorImpl { +class AwsProcessor : public core::ProcessorImpl { public: - EXTENSIONAPI static constexpr auto Bucket = core::PropertyDefinitionBuilder<>::createProperty("Bucket") - .withDescription("The S3 bucket") - .isRequired(true) - .withValidator(core::StandardPropertyValidators::NON_BLANK_VALIDATOR) - .supportsExpressionLanguage(true) - .build(); EXTENSIONAPI static constexpr auto AccessKey = core::PropertyDefinitionBuilder<>::createProperty("Access Key") .withDescription("AWS account access key") .supportsExpressionLanguage(true) @@ -141,8 +128,8 @@ class S3Processor : public core::ProcessorImpl { "port, and path. The AWS libraries select an endpoint URL based on the AWS " "region, but this property overrides the selected endpoint URL, allowing use " "with other S3-compatible endpoints.") - .supportsExpressionLanguage(true) .withValidator(core::StandardPropertyValidators::NON_BLANK_VALIDATOR) + .supportsExpressionLanguage(true) .build(); EXTENSIONAPI static constexpr auto ProxyHost = core::PropertyDefinitionBuilder<>::createProperty("Proxy Host") .withDescription("Proxy host name or IP") @@ -168,7 +155,6 @@ class S3Processor : public core::ProcessorImpl { .isRequired(true) .build(); EXTENSIONAPI static constexpr auto Properties = std::to_array<core::PropertyReference>({ - Bucket, AccessKey, SecretKey, CredentialsFile, @@ -184,20 +170,17 @@ class S3Processor : public core::ProcessorImpl { }); - explicit S3Processor(std::string_view name, const minifi::utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger); + explicit AwsProcessor(std::string_view name, const minifi::utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger); void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; protected: - explicit S3Processor(std::string_view name, const minifi::utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger, std::unique_ptr<aws::s3::S3RequestSender> s3_request_sender); - std::optional<Aws::Auth::AWSCredentials> getAWSCredentialsFromControllerService(core::ProcessContext& context) const; - std::optional<Aws::Auth::AWSCredentials> getAWSCredentials(core::ProcessContext& context, const core::FlowFile* const flow_file); - std::optional<aws::s3::ProxyOptions> getProxy(core::ProcessContext& context, const core::FlowFile* const flow_file); - std::optional<CommonProperties> getCommonELSupportedProperties(core::ProcessContext& context, const core::FlowFile* const flow_file); + std::optional<Aws::Auth::AWSCredentials> getAWSCredentials(core::ProcessContext& context, const core::FlowFile* flow_file); + aws::ProxyOptions getProxy(core::ProcessContext& context, const core::FlowFile* const flow_file); + std::optional<CommonProperties> getCommonELSupportedProperties(core::ProcessContext& context, const core::FlowFile* flow_file); std::shared_ptr<core::logging::Logger> logger_; - aws::s3::S3Wrapper s3_wrapper_; std::optional<Aws::Client::ClientConfiguration> client_config_; }; diff --git a/extensions/aws/processors/DeleteS3Object.cpp b/extensions/aws/processors/DeleteS3Object.cpp index e02efaf5b..84b375e6e 100644 --- a/extensions/aws/processors/DeleteS3Object.cpp +++ b/extensions/aws/processors/DeleteS3Object.cpp @@ -32,9 +32,10 @@ void DeleteS3Object::initialize() { } std::optional<aws::s3::DeleteObjectRequestParameters> DeleteS3Object::buildDeleteS3RequestParams( - core::ProcessContext& context, + const core::ProcessContext& context, const core::FlowFile& flow_file, - const CommonProperties &common_properties) const { + const CommonProperties& common_properties, + const std::string_view bucket) const { gsl_Expects(client_config_); aws::s3::DeleteObjectRequestParameters params(common_properties.credentials, *client_config_); if (const auto object_key = context.getProperty(ObjectKey, &flow_file)) { @@ -51,7 +52,7 @@ std::optional<aws::s3::DeleteObjectRequestParameters> DeleteS3Object::buildDelet } logger_->log_debug("DeleteS3Object: Version [{}]", params.version); - params.bucket = common_properties.bucket; + params.bucket = bucket; params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url); return params; } @@ -70,17 +71,25 @@ void DeleteS3Object::onTrigger(core::ProcessContext& context, core::ProcessSessi return; } - auto params = buildDeleteS3RequestParams(context, *flow_file, *common_properties); + auto bucket = context.getProperty(Bucket.name, flow_file.get()); + if (!bucket) { + logger_->log_error("Bucket is invalid due to {}", bucket.error().message()); + session.transfer(flow_file, Failure); + return; + } + logger_->log_debug("S3Processor: Bucket [{}]", *bucket); + + auto params = buildDeleteS3RequestParams(context, *flow_file, *common_properties, *bucket); if (!params) { session.transfer(flow_file, Failure); return; } if (s3_wrapper_.deleteObject(*params)) { - logger_->log_debug("Successfully deleted S3 object '{}' from bucket '{}'", params->object_key, common_properties->bucket); + logger_->log_debug("Successfully deleted S3 object '{}' from bucket '{}'", params->object_key, *bucket); session.transfer(flow_file, Success); } else { - logger_->log_error("Failed to delete S3 object '{}' from bucket '{}'", params->object_key, common_properties->bucket); + logger_->log_error("Failed to delete S3 object '{}' from bucket '{}'", params->object_key, *bucket); session.transfer(flow_file, Failure); } } diff --git a/extensions/aws/processors/DeleteS3Object.h b/extensions/aws/processors/DeleteS3Object.h index 94dbf0f4f..2d983d2a8 100644 --- a/extensions/aws/processors/DeleteS3Object.h +++ b/extensions/aws/processors/DeleteS3Object.h @@ -81,9 +81,10 @@ class DeleteS3Object : public S3Processor { } std::optional<aws::s3::DeleteObjectRequestParameters> buildDeleteS3RequestParams( - core::ProcessContext& context, + const core::ProcessContext& context, const core::FlowFile& flow_file, - const CommonProperties &common_properties) const; + const CommonProperties &common_properties, + std::string_view bucket) const; }; } // namespace org::apache::nifi::minifi::aws::processors diff --git a/extensions/aws/processors/FetchS3Object.cpp b/extensions/aws/processors/FetchS3Object.cpp index 194d68f54..9a7bb0ec4 100644 --- a/extensions/aws/processors/FetchS3Object.cpp +++ b/extensions/aws/processors/FetchS3Object.cpp @@ -1,6 +1,4 @@ /** - * @file FetchS3Object.cpp - * FetchS3Object class implementation * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -43,12 +41,13 @@ void FetchS3Object::onSchedule(core::ProcessContext& context, core::ProcessSessi } std::optional<aws::s3::GetObjectRequestParameters> FetchS3Object::buildFetchS3RequestParams( - core::ProcessContext& context, + const core::ProcessContext& context, const core::FlowFile& flow_file, - const CommonProperties &common_properties) const { + const CommonProperties &common_properties, + const std::string_view bucket) const { gsl_Expects(client_config_); minifi::aws::s3::GetObjectRequestParameters get_object_params(common_properties.credentials, *client_config_); - get_object_params.bucket = common_properties.bucket; + get_object_params.bucket = bucket; get_object_params.requester_pays = requester_pays_; if (const auto object_key = context.getProperty(ObjectKey, &flow_file)) { @@ -82,7 +81,15 @@ void FetchS3Object::onTrigger(core::ProcessContext& context, core::ProcessSessio return; } - auto get_object_params = buildFetchS3RequestParams(context, *flow_file, *common_properties); + auto bucket = context.getProperty(Bucket.name, flow_file.get()); + if (!bucket) { + logger_->log_error("Bucket is invalid due to {}", bucket.error().message()); + session.transfer(flow_file, Failure); + return; + } + logger_->log_debug("S3Processor: Bucket [{}]", *bucket); + + auto get_object_params = buildFetchS3RequestParams(context, *flow_file, *common_properties, *bucket); if (!get_object_params) { session.transfer(flow_file, Failure); return; diff --git a/extensions/aws/processors/FetchS3Object.h b/extensions/aws/processors/FetchS3Object.h index 5505fff59..ea933b4e7 100644 --- a/extensions/aws/processors/FetchS3Object.h +++ b/extensions/aws/processors/FetchS3Object.h @@ -92,9 +92,10 @@ class FetchS3Object : public S3Processor { } std::optional<aws::s3::GetObjectRequestParameters> buildFetchS3RequestParams( - core::ProcessContext& context, + const core::ProcessContext& context, const core::FlowFile& flow_file, - const CommonProperties &common_properties) const; + const CommonProperties &common_properties, + std::string_view bucket) const; bool requester_pays_ = false; }; diff --git a/extensions/aws/processors/ListS3.cpp b/extensions/aws/processors/ListS3.cpp index a3a482ae4..f11bca209 100644 --- a/extensions/aws/processors/ListS3.cpp +++ b/extensions/aws/processors/ListS3.cpp @@ -25,6 +25,7 @@ #include "core/ProcessSession.h" #include "core/Resource.h" #include "utils/ProcessorConfigUtils.h" +#include "utils/expected.h" namespace org::apache::nifi::minifi::aws::processors { @@ -47,10 +48,13 @@ void ListS3::onSchedule(core::ProcessContext& context, core::ProcessSessionFacto throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Required property is not set or invalid"); } + auto bucket = context.getProperty(Bucket.name) | minifi::utils::orThrow("Required property"); + logger_->log_debug("S3Processor: Bucket [{}]", bucket); + gsl_Expects(client_config_); list_request_params_ = std::make_unique<aws::s3::ListRequestParameters>(common_properties->credentials, *client_config_); list_request_params_->setClientConfig(common_properties->proxy, common_properties->endpoint_override_url); - list_request_params_->bucket = common_properties->bucket; + list_request_params_->bucket = bucket; if (const auto delimiter = context.getProperty(Delimiter)) { list_request_params_->delimiter = *delimiter; diff --git a/extensions/aws/processors/PutKinesisStream.cpp b/extensions/aws/processors/PutKinesisStream.cpp new file mode 100644 index 000000000..e34b79ea5 --- /dev/null +++ b/extensions/aws/processors/PutKinesisStream.cpp @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "PutKinesisStream.h" + +#include <memory> +#include <random> +#include <ranges> +#include <unordered_map> + +#include "aws/kinesis/KinesisClient.h" +#include "aws/kinesis/model/PutRecordsRequest.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/Resource.h" +#include "range/v3/algorithm/for_each.hpp" +#include "range/v3/view.hpp" +#include "utils/ProcessorConfigUtils.h" + +namespace org::apache::nifi::minifi::aws::processors { + +void PutKinesisStream::initialize() { + setSupportedProperties(Properties); + setSupportedRelationships(Relationships); +} + +void PutKinesisStream::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) { + AwsProcessor::onSchedule(context, session_factory); + + batch_size_ = parseU64Property(context, MessageBatchSize); + if (batch_size_ == 0 || batch_size_ > 500) { + logger_->log_warn("{} is invalid. Setting it to the maximum 500 value.", MessageBatchSize.name); + batch_size_ = 500; + } + batch_data_size_soft_cap_ = parseDataSizeProperty(context, MaxBatchDataSize); + if (batch_data_size_soft_cap_ > 4_MB) { + logger_->log_warn("{} is invalid. Setting it to the maximum 4 MB value.", MaxBatchDataSize.name); + batch_data_size_soft_cap_ = 4_MB; + } + + endpoint_override_url_ = context.getProperty(EndpointOverrideURL.name) | minifi::utils::toOptional(); +} + +nonstd::expected<Aws::Kinesis::Model::PutRecordsRequestEntry, PutKinesisStream::BatchItemError> PutKinesisStream::createEntryFromFlowFile(const core::ProcessContext& context, + core::ProcessSession& session, + const std::shared_ptr<core::FlowFile>& flow_file) const { + Aws::Kinesis::Model::PutRecordsRequestEntry entry; + const auto partition_key = context.getProperty(AmazonKinesisStreamPartitionKey.name, flow_file.get()) | minifi::utils::valueOrElse([&flow_file] { return flow_file->getUUID().to_string(); }); + entry.SetPartitionKey(partition_key); + const auto [status, buffer] = session.readBuffer(flow_file); + if (io::isError(status)) { + logger_->log_error("Couldn't read content from {}", flow_file->getUUIDStr()); + return nonstd::make_unexpected(BatchItemError{.error_message = "Failed to read content", .error_code = std::nullopt}); + } + Aws::Utils::ByteBuffer aws_buffer(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size()); + entry.SetData(aws_buffer); + return entry; +} + +std::unordered_map<std::string, PutKinesisStream::StreamBatch> PutKinesisStream::createStreamBatches(const core::ProcessContext& context, core::ProcessSession& session) const { + static constexpr uint64_t SINGLE_RECORD_MAX_SIZE = 1_MB; + std::unordered_map<std::string, StreamBatch> stream_batches; + uint64_t ff_count_in_batches = 0; + while (ff_count_in_batches < batch_size_) { + std::shared_ptr<core::FlowFile> flow_file = session.get(); + if (!flow_file) { break; } + const auto flow_file_size = flow_file->getSize(); + if (flow_file_size > SINGLE_RECORD_MAX_SIZE) { + flow_file->setAttribute(AwsKinesisErrorMessage.name, fmt::format("record too big {}, max allowed {}", flow_file_size, SINGLE_RECORD_MAX_SIZE)); + session.transfer(flow_file, Failure); + logger_->log_error("Failed to publish to kinesis record {} because the size was greater than {} bytes", flow_file->getUUID().to_string(), SINGLE_RECORD_MAX_SIZE); + continue; + } + + auto stream_name = context.getProperty(AmazonKinesisStreamName.name, flow_file.get()); + if (!stream_name) { + logger_->log_error("Stream name is invalid due to {}", stream_name.error().message()); + flow_file->setAttribute(AwsKinesisErrorMessage.name, fmt::format("Stream name is invalid due to {}", stream_name.error().message())); + session.transfer(flow_file, Failure); + continue; + } + + auto entry = createEntryFromFlowFile(context, session, flow_file); + if (!entry) { + flow_file->addAttribute(AwsKinesisErrorMessage.name, entry.error().error_message); + if (entry.error().error_code) { + flow_file->addAttribute(AwsKinesisErrorCode.name, *entry.error().error_code); + } + session.transfer(flow_file, Failure); + continue; + } + + auto [stream_batch, newly_created] = stream_batches.emplace(*stream_name, StreamBatch{}); + if (newly_created) { + stream_batch->second.request.SetStreamName(*stream_name); + } + stream_batch->second.request.AddRecords(*entry); + stream_batch->second.items.push_back(BatchItem{.flow_file = std::move(flow_file), .result = BatchItemResult{}}); + stream_batches[*stream_name].batch_size += flow_file_size; + ++ff_count_in_batches; + + if (stream_batches[*stream_name].batch_size > batch_data_size_soft_cap_) { + break; + } + } + return stream_batches; +} + +void PutKinesisStream::processBatch(StreamBatch& stream_batch, const Aws::Kinesis::KinesisClient& client) const { + const auto put_record_result = client.PutRecords(stream_batch.request); + if (!put_record_result.IsSuccess()) { + ranges::for_each(stream_batch.items, [&](auto& item) { + item.result = nonstd::make_unexpected(BatchItemError{ + .error_message = put_record_result.GetError().GetMessage(), + .error_code = std::to_string(static_cast<int>(put_record_result.GetError().GetResponseCode()))}); + }); + return; + } + + const auto result_records = put_record_result.GetResult().GetRecords(); + if (result_records.size() != stream_batch.items.size()) { + logger_->log_critical("PutKinesisStream record size ({}) and result size ({}) mismatch in {} cannot tell which record succeeded and which didnt", + stream_batch.items.size(), result_records.size(), stream_batch.request.GetStreamName()); + ranges::for_each(stream_batch.items, [&](auto& item) { + item.result = nonstd::make_unexpected(BatchItemError{ + .error_message = "Record size mismatch", + .error_code = std::nullopt}); + }); + return; + } + + for (uint64_t i = 0; i < stream_batch.items.size(); i++) { + auto& [flow_file, result] = stream_batch.items[i]; + const auto& result_record = result_records[i]; + if (!result_record.GetErrorCode().empty()) { + result = nonstd::make_unexpected(BatchItemError{.error_message = result_record.GetErrorMessage(), .error_code = result_record.GetErrorCode()}); + } else { + result = BatchItemResult{.sequence_number = result_record.GetSequenceNumber(), .shard_id = result_record.GetShardId()}; + } + } +} + +void PutKinesisStream::transferFlowFiles(core::ProcessSession& session, const StreamBatch& stream_batch) { + for (const auto& batch_item : stream_batch.items) { + if (batch_item.result) { + batch_item.flow_file->setAttribute(AwsKinesisSequenceNumber.name, batch_item.result->sequence_number); + batch_item.flow_file->setAttribute(AwsKinesisShardId.name, batch_item.result->shard_id); + session.transfer(batch_item.flow_file, Success); + } else { + batch_item.flow_file->setAttribute(AwsKinesisErrorMessage.name, batch_item.result.error().error_message); + if (batch_item.result.error().error_code) { + batch_item.flow_file->setAttribute(AwsKinesisErrorCode.name, *batch_item.result.error().error_code); + } + session.transfer(batch_item.flow_file, Failure); + } + } +} + + +void PutKinesisStream::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { + logger_->log_trace("PutKinesisStream onTrigger"); + + const auto credentials = getAWSCredentials(context, nullptr); + if (!credentials) { + logger_->log_error("Failed to get credentials for PutKinesisStream"); + context.yield(); + return; + } + + auto stream_batches = createStreamBatches(context, session); + if (stream_batches.empty()) { + context.yield(); + return; + } + const auto kinesis_client = getClient(*credentials); + + for (auto& stream_batch: stream_batches | std::views::values) { + processBatch(stream_batch, *kinesis_client); + transferFlowFiles(session, stream_batch); + } +} + +std::unique_ptr<Aws::Kinesis::KinesisClient> PutKinesisStream::getClient(const Aws::Auth::AWSCredentials& credentials) { + gsl_Expects(client_config_); + auto client = std::make_unique<Aws::Kinesis::KinesisClient>(credentials, *client_config_); + if (endpoint_override_url_) { + client->OverrideEndpoint(*endpoint_override_url_); + } + return client; +} + +REGISTER_RESOURCE(PutKinesisStream, Processor); + +} // namespace org::apache::nifi::minifi::aws::processors diff --git a/extensions/aws/processors/PutKinesisStream.h b/extensions/aws/processors/PutKinesisStream.h new file mode 100644 index 000000000..c30483a20 --- /dev/null +++ b/extensions/aws/processors/PutKinesisStream.h @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + + +#include <memory> +#include <optional> +#include <sstream> +#include <string> +#include <utility> + +#include "S3Processor.h" +#include "aws/kinesis/KinesisClient.h" +#include "aws/kinesis/model/PutRecordsRequest.h" +#include "core/PropertyDefinitionBuilder.h" +#include "utils/ArrayUtils.h" +#include "utils/expected.h" + +namespace org::apache::nifi::minifi::aws::processors { + +class PutKinesisStream : public AwsProcessor { + public: + EXTENSIONAPI static constexpr const char* Description = "Sends the contents to a specified Amazon Kinesis. In order to send data to Kinesis, the stream name has to be specified."; + + EXTENSIONAPI static constexpr auto AmazonKinesisStreamName = core::PropertyDefinitionBuilder<>::createProperty("Amazon Kinesis Stream Name") + .withDescription("The name of Kinesis Stream") + .isRequired(true) + .withValidator(core::StandardPropertyValidators::NON_BLANK_VALIDATOR) + .supportsExpressionLanguage(true) + .build(); + EXTENSIONAPI static constexpr auto AmazonKinesisStreamPartitionKey = core::PropertyDefinitionBuilder<>::createProperty("Amazon Kinesis Stream Partition Key") + .withDescription("The partition key attribute. If it is not set, a random value is used") + .supportsExpressionLanguage(true) + .build(); + EXTENSIONAPI static constexpr auto MessageBatchSize = core::PropertyDefinitionBuilder<>::createProperty("Batch Size") + .withDescription("Batch size for messages. [1-500]") + .withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR) + .withDefaultValue("250") + .build(); + EXTENSIONAPI static constexpr auto MaxBatchDataSize = core::PropertyDefinitionBuilder<>::createProperty("Max Batch Data Size") + .withDescription("Soft cap on the data size of the batch to a single stream. (max 4MB)") + .withValidator(core::StandardPropertyValidators::DATA_SIZE_VALIDATOR) + .withDefaultValue("1 MB") + .build(); + + EXTENSIONAPI static constexpr auto Properties = minifi::utils::array_cat(AwsProcessor::Properties, std::to_array<core::PropertyReference>({ + AmazonKinesisStreamName, AmazonKinesisStreamPartitionKey, MessageBatchSize, MaxBatchDataSize + })); + + EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "FlowFiles are routed to success relationship"}; + EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", "FlowFiles are routed to failure relationship"}; + EXTENSIONAPI static constexpr auto Relationships = std::array{Success, Failure}; + + EXTENSIONAPI static constexpr auto AwsKinesisErrorMessage = core::OutputAttributeDefinition<>{"aws.kinesis.error.message", { Failure }, + "Error message on posting message to AWS Kinesis"}; + EXTENSIONAPI static constexpr auto AwsKinesisErrorCode = core::OutputAttributeDefinition<>{"aws.kinesis.error.code", { Failure }, + "Error code for the message when posting to AWS Kinesis"}; + EXTENSIONAPI static constexpr auto AwsKinesisSequenceNumber = core::OutputAttributeDefinition<>{"aws.kinesis.sequence.number", { Success }, + "Sequence number for the message when posting to AWS Kinesis"}; + EXTENSIONAPI static constexpr auto AwsKinesisShardId = core::OutputAttributeDefinition<>{"aws.kinesis.shard.id", { Success }, + "Shard id of the message posted to AWS Kinesis"}; + EXTENSIONAPI static constexpr auto OutputAttributes = std::to_array<core::OutputAttributeReference>({ + AwsKinesisErrorMessage, + AwsKinesisErrorCode, + AwsKinesisSequenceNumber, + AwsKinesisShardId}); + + EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; + EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; + EXTENSIONAPI static constexpr auto InputRequirement = core::annotation::Input::INPUT_REQUIRED; + EXTENSIONAPI static constexpr bool IsSingleThreaded = false; + + ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS + + explicit PutKinesisStream(const std::string& name, const minifi::utils::Identifier& uuid = minifi::utils::Identifier()) + : AwsProcessor(name, uuid, core::logging::LoggerFactory<PutKinesisStream>::getLogger(uuid)) { + } + PutKinesisStream(const PutKinesisStream&) = delete; + PutKinesisStream(PutKinesisStream&&) = delete; + PutKinesisStream& operator=(const PutKinesisStream&) = delete; + PutKinesisStream& operator=(PutKinesisStream&&) = delete; + ~PutKinesisStream() override = default; + + void initialize() override; + void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; + void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; + + protected: + virtual std::unique_ptr<Aws::Kinesis::KinesisClient> getClient(const Aws::Auth::AWSCredentials& credentials); + + private: + struct BatchItemResult { + std::string sequence_number; + std::string shard_id; + }; + struct BatchItemError { + std::string error_message; + std::optional<std::string> error_code; + }; + struct BatchItem { + std::shared_ptr<core::FlowFile> flow_file; + nonstd::expected<BatchItemResult, BatchItemError> result; + }; + struct StreamBatch { + uint64_t batch_size = 0; + std::vector<BatchItem> items; + Aws::Kinesis::Model::PutRecordsRequest request; + }; + + nonstd::expected<Aws::Kinesis::Model::PutRecordsRequestEntry, BatchItemError> createEntryFromFlowFile(const core::ProcessContext& context, + core::ProcessSession& session, + const std::shared_ptr<core::FlowFile>& flow_file) const; + + std::unordered_map<std::string, StreamBatch> createStreamBatches(const core::ProcessContext& context, core::ProcessSession& session) const; + void processBatch(StreamBatch& stream_batch, const Aws::Kinesis::KinesisClient& client) const; + static void transferFlowFiles(core::ProcessSession& session, const StreamBatch& stream_batch); + + uint64_t batch_size_ = 250; + uint64_t batch_data_size_soft_cap_ = 1_MB; + const utils::AWSInitializer& AWS_INITIALIZER = utils::AWSInitializer::get(); + std::optional<std::string> endpoint_override_url_; +}; + +} // namespace org::apache::nifi::minifi::aws::processors diff --git a/extensions/aws/processors/PutS3Object.cpp b/extensions/aws/processors/PutS3Object.cpp index b1d7b972c..ddb218f98 100644 --- a/extensions/aws/processors/PutS3Object.cpp +++ b/extensions/aws/processors/PutS3Object.cpp @@ -119,7 +119,7 @@ std::string PutS3Object::parseAccessControlList(const std::string &comma_separat } bool PutS3Object::setCannedAcl( - core::ProcessContext& context, + const core::ProcessContext& context, const core::FlowFile& flow_file, aws::s3::PutObjectRequestParameters &put_s3_request_params) const { if (const auto canned_acl = context.getProperty(CannedACL, &flow_file)) { @@ -135,7 +135,7 @@ bool PutS3Object::setCannedAcl( } bool PutS3Object::setAccessControl( - core::ProcessContext& context, + const core::ProcessContext& context, const core::FlowFile& flow_file, aws::s3::PutObjectRequestParameters &put_s3_request_params) const { if (const auto full_control_user_list = context.getProperty(FullControlUserList, &flow_file)) { @@ -159,13 +159,14 @@ bool PutS3Object::setAccessControl( } std::optional<aws::s3::PutObjectRequestParameters> PutS3Object::buildPutS3RequestParams( - core::ProcessContext& context, + const core::ProcessContext& context, const core::FlowFile& flow_file, - const CommonProperties &common_properties) const { + const CommonProperties &common_properties, + const std::string_view bucket) const { gsl_Expects(client_config_); aws::s3::PutObjectRequestParameters params(common_properties.credentials, *client_config_); params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url); - params.bucket = common_properties.bucket; + params.bucket = bucket; params.user_metadata_map = user_metadata_map_; params.server_side_encryption = server_side_encryption_; params.storage_class = storage_class_; @@ -215,7 +216,7 @@ void PutS3Object::setAttributes( } } -void PutS3Object::ageOffMultipartUploads(const CommonProperties &common_properties) { +void PutS3Object::ageOffMultipartUploads(const CommonProperties &common_properties, const std::string_view bucket) { { std::lock_guard<std::mutex> lock(last_ageoff_mutex_); const auto now = std::chrono::system_clock::now(); @@ -229,7 +230,7 @@ void PutS3Object::ageOffMultipartUploads(const CommonProperties &common_properti logger_->log_trace("Listing aged off multipart uploads still in progress."); aws::s3::ListMultipartUploadsRequestParameters list_params(common_properties.credentials, *client_config_); list_params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url); - list_params.bucket = common_properties.bucket; + list_params.bucket = bucket; list_params.age_off_limit = multipart_upload_max_age_threshold_; list_params.use_virtual_addressing = use_virtual_addressing_; auto aged_off_uploads_in_progress = s3_wrapper_.listMultipartUploads(list_params); @@ -238,14 +239,14 @@ void PutS3Object::ageOffMultipartUploads(const CommonProperties &common_properti return; } - logger_->log_info("Found {} aged off pending multipart upload jobs in bucket '{}'", aged_off_uploads_in_progress->size(), common_properties.bucket); + logger_->log_info("Found {} aged off pending multipart upload jobs in bucket '{}'", aged_off_uploads_in_progress->size(), bucket); size_t aborted = 0; for (const auto& upload : *aged_off_uploads_in_progress) { logger_->log_info("Aborting multipart upload with key '{}' and upload id '{}' in bucket '{}' due to reaching maximum upload age threshold.", - upload.key, upload.upload_id, common_properties.bucket); + upload.key, upload.upload_id, bucket); aws::s3::AbortMultipartUploadRequestParameters abort_params(common_properties.credentials, *client_config_); abort_params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url); - abort_params.bucket = common_properties.bucket; + abort_params.bucket = bucket; abort_params.key = upload.key; abort_params.upload_id = upload.upload_id; abort_params.use_virtual_addressing = use_virtual_addressing_; @@ -256,7 +257,7 @@ void PutS3Object::ageOffMultipartUploads(const CommonProperties &common_properti ++aborted; } if (aborted > 0) { - logger_->log_info("Aborted {} pending multipart upload jobs in bucket '{}'", aborted, common_properties.bucket); + logger_->log_info("Aborted {} pending multipart upload jobs in bucket '{}'", aborted, bucket); } s3_wrapper_.ageOffLocalS3MultipartUploadStates(multipart_upload_max_age_threshold_); } @@ -275,9 +276,17 @@ void PutS3Object::onTrigger(core::ProcessContext& context, core::ProcessSession& return; } - ageOffMultipartUploads(*common_properties); + auto bucket = context.getProperty(Bucket.name, flow_file.get()); + if (!bucket) { + logger_->log_error("Bucket is invalid due to {}", bucket.error().message()); + session.transfer(flow_file, Failure); + return; + } + logger_->log_debug("S3Processor: Bucket [{}]", *bucket); + + ageOffMultipartUploads(*common_properties, *bucket); - auto put_s3_request_params = buildPutS3RequestParams(context, *flow_file, *common_properties); + auto put_s3_request_params = buildPutS3RequestParams(context, *flow_file, *common_properties, *bucket); if (!put_s3_request_params) { session.transfer(flow_file, Failure); return; diff --git a/extensions/aws/processors/PutS3Object.h b/extensions/aws/processors/PutS3Object.h index 6e2ca0b47..1af4d1cfd 100644 --- a/extensions/aws/processors/PutS3Object.h +++ b/extensions/aws/processors/PutS3Object.h @@ -210,18 +210,19 @@ class PutS3Object : public S3Processor { void fillUserMetadata(core::ProcessContext& context); static std::string parseAccessControlList(const std::string &comma_separated_list); - bool setCannedAcl(core::ProcessContext& context, const core::FlowFile& flow_file, aws::s3::PutObjectRequestParameters &put_s3_request_params) const; - bool setAccessControl(core::ProcessContext& context, const core::FlowFile& flow_file, aws::s3::PutObjectRequestParameters &put_s3_request_params) const; + bool setCannedAcl(const core::ProcessContext& context, const core::FlowFile& flow_file, aws::s3::PutObjectRequestParameters &put_s3_request_params) const; + bool setAccessControl(const core::ProcessContext& context, const core::FlowFile& flow_file, aws::s3::PutObjectRequestParameters &put_s3_request_params) const; void setAttributes( core::ProcessSession& session, core::FlowFile& flow_file, const aws::s3::PutObjectRequestParameters &put_s3_request_params, const minifi::aws::s3::PutObjectResult &put_object_result) const; std::optional<aws::s3::PutObjectRequestParameters> buildPutS3RequestParams( - core::ProcessContext& context, + const core::ProcessContext& context, const core::FlowFile& flow_file, - const CommonProperties &common_properties) const; - void ageOffMultipartUploads(const CommonProperties &common_properties); + const CommonProperties &common_properties, + std::string_view bucket) const; + void ageOffMultipartUploads(const CommonProperties &common_properties, const std::string_view bucket); std::string user_metadata_; std::map<std::string, std::string> user_metadata_map_; diff --git a/extensions/aws/processors/S3Processor.cpp b/extensions/aws/processors/S3Processor.cpp index 50351cec1..6be93f626 100644 --- a/extensions/aws/processors/S3Processor.cpp +++ b/extensions/aws/processors/S3Processor.cpp @@ -32,120 +32,19 @@ namespace org::apache::nifi::minifi::aws::processors { -S3Processor::S3Processor(std::string_view name, const minifi::utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger) - : core::ProcessorImpl(name, uuid), - logger_(std::move(logger)) { -} +S3Processor::S3Processor(const std::string_view name, const minifi::utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger) + : AwsProcessor(name, uuid, std::move(logger)) {} -S3Processor::S3Processor(std::string_view name, const minifi::utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger, std::unique_ptr<aws::s3::S3RequestSender> s3_request_sender) - : core::ProcessorImpl(name, uuid), - logger_(std::move(logger)), +S3Processor::S3Processor(const std::string_view name, const minifi::utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger, std::unique_ptr<aws::s3::S3RequestSender> s3_request_sender) + : AwsProcessor(name, uuid, std::move(logger)), s3_wrapper_(std::move(s3_request_sender)) { } -std::optional<Aws::Auth::AWSCredentials> S3Processor::getAWSCredentialsFromControllerService(core::ProcessContext& context) const { - if (const auto aws_credentials_service = minifi::utils::parseOptionalControllerService<controllers::AWSCredentialsService>(context, AWSCredentialsProviderService, getUUID())) { - return (*aws_credentials_service)->getAWSCredentials(); - } - logger_->log_error("AWS credentials service could not be found"); - - return std::nullopt; -} - -std::optional<Aws::Auth::AWSCredentials> S3Processor::getAWSCredentials( - core::ProcessContext& context, - const core::FlowFile* const flow_file) { - auto service_cred = getAWSCredentialsFromControllerService(context); - if (service_cred) { - logger_->log_info("AWS Credentials successfully set from controller service"); - return service_cred.value(); - } - - aws::AWSCredentialsProvider aws_credentials_provider; - if (const auto access_key = context.getProperty(AccessKey.name, flow_file)) { - aws_credentials_provider.setAccessKey(*access_key); - } - if (const auto secret_key = context.getProperty(SecretKey.name, flow_file)) { - aws_credentials_provider.setSecretKey(*secret_key); - } - if (const auto credentials_file = context.getProperty(CredentialsFile.name, flow_file)) { - aws_credentials_provider.setCredentialsFile(*credentials_file); - } - if (const auto use_credentials = context.getProperty(UseDefaultCredentials.name, flow_file) | minifi::utils::andThen(parsing::parseBool)) { - aws_credentials_provider.setUseDefaultCredentials(*use_credentials); - } - - return aws_credentials_provider.getAWSCredentials(); -} - -std::optional<aws::s3::ProxyOptions> S3Processor::getProxy(core::ProcessContext& context, const core::FlowFile* const flow_file) { - aws::s3::ProxyOptions proxy; - - proxy.host = minifi::utils::parseOptionalProperty(context, ProxyHost, flow_file).value_or(""); - proxy.port = gsl::narrow<uint32_t>(minifi::utils::parseOptionalU64Property(context, ProxyPort, flow_file).value_or(0)); - proxy.username = minifi::utils::parseOptionalProperty(context, ProxyUsername, flow_file).value_or(""); - proxy.password = minifi::utils::parseOptionalProperty(context, ProxyPassword, flow_file).value_or(""); - - if (!proxy.host.empty()) { - logger_->log_info("Proxy for S3Processor was set."); - } - return proxy; -} - -void S3Processor::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { - client_config_ = Aws::Client::ClientConfiguration(); +void S3Processor::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) { + AwsProcessor::onSchedule(context, session_factory); if (!getProperty(Bucket.name)) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Bucket property missing or invalid"); } - - client_config_->region = context.getProperty(Region) | minifi::utils::orThrow("Region property missing or invalid"); - logger_->log_debug("S3Processor: Region [{}]", client_config_->region); - - if (auto communications_timeout = minifi::utils::parseOptionalDurationProperty(context, CommunicationsTimeout)) { - logger_->log_debug("S3Processor: Communications Timeout {}", *communications_timeout); - client_config_->connectTimeoutMs = gsl::narrow<long>(communications_timeout->count()); // NOLINT(runtime/int,google-runtime-int) - } else { - throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Communications Timeout missing or invalid"); - } - - static const auto default_ca_file = minifi::utils::getDefaultCAFile(); - if (default_ca_file) { - client_config_->caFile = *default_ca_file; - } -} - -std::optional<CommonProperties> S3Processor::getCommonELSupportedProperties( - core::ProcessContext& context, - const core::FlowFile* const flow_file) { - CommonProperties properties; - if (auto bucket = context.getProperty(Bucket, flow_file); !bucket || bucket->empty()) { - logger_->log_error("Bucket '{}' is invalid or empty!", properties.bucket); - return std::nullopt; - } else { - properties.bucket = *bucket; - } - logger_->log_debug("S3Processor: Bucket [{}]", properties.bucket); - - auto credentials = getAWSCredentials(context, flow_file); - if (!credentials) { - logger_->log_error("AWS Credentials have not been set!"); - return std::nullopt; - } - properties.credentials = credentials.value(); - - auto proxy = getProxy(context, flow_file); - if (!proxy) { - return std::nullopt; - } - properties.proxy = proxy.value(); - - const auto endpoint_override_url = context.getProperty(EndpointOverrideURL, flow_file); - if (endpoint_override_url) { - properties.endpoint_override_url = *endpoint_override_url; - logger_->log_debug("S3Processor: Endpoint Override URL [{}]", properties.endpoint_override_url); - } - - return properties; } } // namespace org::apache::nifi::minifi::aws::processors diff --git a/extensions/aws/processors/S3Processor.h b/extensions/aws/processors/S3Processor.h index fe628e27c..41beec73d 100644 --- a/extensions/aws/processors/S3Processor.h +++ b/extensions/aws/processors/S3Processor.h @@ -1,6 +1,4 @@ /** - * @file S3Processor.h - * Base S3 processor class declaration * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -28,80 +26,21 @@ #include <string_view> #include <utility> -#include "aws/core/auth/AWSCredentialsProvider.h" -#include "S3Wrapper.h" #include "AWSCredentialsProvider.h" +#include "AwsProcessor.h" +#include "S3Wrapper.h" +#include "aws/core/auth/AWSCredentialsProvider.h" +#include "core/Processor.h" #include "core/Property.h" #include "core/PropertyDefinition.h" #include "core/PropertyDefinitionBuilder.h" -#include "minifi-cpp/core/PropertyValidator.h" -#include "core/Processor.h" #include "core/logging/Logger.h" #include "core/logging/LoggerFactory.h" #include "utils/OptionalUtils.h" namespace org::apache::nifi::minifi::aws::processors { -namespace region { -inline constexpr std::string_view AF_SOUTH_1 = "af-south-1"; -inline constexpr std::string_view AP_EAST_1 = "ap-east-1"; -inline constexpr std::string_view AP_NORTHEAST_1 = "ap-northeast-1"; -inline constexpr std::string_view AP_NORTHEAST_2 = "ap-northeast-2"; -inline constexpr std::string_view AP_NORTHEAST_3 = "ap-northeast-3"; -inline constexpr std::string_view AP_SOUTH_1 = "ap-south-1"; -inline constexpr std::string_view AP_SOUTH_2 = "ap-south-2"; -inline constexpr std::string_view AP_SOUTHEAST_1 = "ap-southeast-1"; -inline constexpr std::string_view AP_SOUTHEAST_2 = "ap-southeast-2"; -inline constexpr std::string_view AP_SOUTHEAST_3 = "ap-southeast-3"; -inline constexpr std::string_view AP_SOUTHEAST_4 = "ap-southeast-4"; -inline constexpr std::string_view AP_SOUTHEAST_5 = "ap-southeast-5"; -inline constexpr std::string_view AP_SOUTHEAST_7 = "ap-southeast-7"; -inline constexpr std::string_view CA_CENTRAL_1 = "ca-central-1"; -inline constexpr std::string_view CA_WEST_1 = "ca-west-1"; -inline constexpr std::string_view CN_NORTH_1 = "cn-north-1"; -inline constexpr std::string_view CN_NORTHWEST_1 = "cn-northwest-1"; -inline constexpr std::string_view EU_CENTRAL_1 = "eu-central-1"; -inline constexpr std::string_view EU_CENTRAL_2 = "eu-central-2"; -inline constexpr std::string_view EU_ISOE_WEST_1 = "eu-isoe-west-1"; -inline constexpr std::string_view EU_NORTH_1 = "eu-north-1"; -inline constexpr std::string_view EU_SOUTH_1 = "eu-south-1"; -inline constexpr std::string_view EU_SOUTH_2 = "eu-south-2"; -inline constexpr std::string_view EU_WEST_1 = "eu-west-1"; -inline constexpr std::string_view EU_WEST_2 = "eu-west-2"; -inline constexpr std::string_view EU_WEST_3 = "eu-west-3"; -inline constexpr std::string_view IL_CENTRAL_1 = "il-central-1"; -inline constexpr std::string_view ME_CENTRAL_1 = "me-central-1"; -inline constexpr std::string_view ME_SOUTH_1 = "me-south-1"; -inline constexpr std::string_view MX_CENTRAL_1 = "mx-central-1"; -inline constexpr std::string_view SA_EAST_1 = "sa-east-1"; -inline constexpr std::string_view US_EAST_1 = "us-east-1"; -inline constexpr std::string_view US_EAST_2 = "us-east-2"; -inline constexpr std::string_view US_GOV_EAST_1 = "us-gov-east-1"; -inline constexpr std::string_view US_GOV_WEST_1 = "us-gov-west-1"; -inline constexpr std::string_view US_ISO_EAST_1 = "us-iso-east-1"; -inline constexpr std::string_view US_ISO_WEST_1 = "us-iso-west-1"; -inline constexpr std::string_view US_ISOB_EAST_1 = "us-isob-east-1"; -inline constexpr std::string_view US_ISOF_EAST_1 = "us-isof-east-1"; -inline constexpr std::string_view US_ISOF_SOUTH_1 = "us-isof-south-1"; -inline constexpr std::string_view US_WEST_1 = "us-west-1"; -inline constexpr std::string_view US_WEST_2 = "us-west-2"; - -inline constexpr auto REGIONS = std::array{ - AF_SOUTH_1, AP_EAST_1, AP_NORTHEAST_1, AP_NORTHEAST_2, AP_NORTHEAST_3, AP_SOUTH_1, AP_SOUTH_2, AP_SOUTHEAST_1, AP_SOUTHEAST_2, AP_SOUTHEAST_3, AP_SOUTHEAST_4, AP_SOUTHEAST_5, AP_SOUTHEAST_7, - CA_CENTRAL_1, CA_WEST_1, CN_NORTH_1, CN_NORTHWEST_1, EU_CENTRAL_1, EU_CENTRAL_2, EU_ISOE_WEST_1, EU_NORTH_1, EU_SOUTH_1, EU_SOUTH_2, EU_WEST_1, EU_WEST_2, EU_WEST_3, IL_CENTRAL_1, ME_CENTRAL_1, - ME_SOUTH_1, MX_CENTRAL_1, SA_EAST_1, US_EAST_1, US_EAST_2, US_GOV_EAST_1, US_GOV_WEST_1, US_ISO_EAST_1, US_ISO_WEST_1, US_ISOB_EAST_1, US_ISOF_EAST_1, US_ISOF_SOUTH_1, US_WEST_1, US_WEST_2 -}; -} // namespace region - -struct CommonProperties { - std::string bucket; - std::string object_key; - Aws::Auth::AWSCredentials credentials; - aws::s3::ProxyOptions proxy; - std::string endpoint_override_url; -}; - -class S3Processor : public core::ProcessorImpl { +class S3Processor : public AwsProcessor { public: EXTENSIONAPI static constexpr auto Bucket = core::PropertyDefinitionBuilder<>::createProperty("Bucket") .withDescription("The S3 bucket") @@ -109,96 +48,16 @@ class S3Processor : public core::ProcessorImpl { .withValidator(core::StandardPropertyValidators::NON_BLANK_VALIDATOR) .supportsExpressionLanguage(true) .build(); - EXTENSIONAPI static constexpr auto AccessKey = core::PropertyDefinitionBuilder<>::createProperty("Access Key") - .withDescription("AWS account access key") - .supportsExpressionLanguage(true) - .build(); - EXTENSIONAPI static constexpr auto SecretKey = core::PropertyDefinitionBuilder<>::createProperty("Secret Key") - .withDescription("AWS account secret key") - .supportsExpressionLanguage(true) - .isSensitive(true) - .build(); - EXTENSIONAPI static constexpr auto CredentialsFile = core::PropertyDefinitionBuilder<>::createProperty("Credentials File") - .withDescription("Path to a file containing AWS access key and secret key in properties file format. Properties used: accessKey and secretKey") - .build(); - EXTENSIONAPI static constexpr auto AWSCredentialsProviderService = core::PropertyDefinitionBuilder<>::createProperty("AWS Credentials Provider service") - .withDescription("The name of the AWS Credentials Provider controller service that is used to obtain AWS credentials.") - .build(); - EXTENSIONAPI static constexpr auto Region = core::PropertyDefinitionBuilder<region::REGIONS.size()>::createProperty("Region") - .isRequired(true) - .withDefaultValue(region::US_WEST_2) - .withAllowedValues(region::REGIONS) - .withDescription("AWS Region") - .build(); - EXTENSIONAPI static constexpr auto CommunicationsTimeout = core::PropertyDefinitionBuilder<>::createProperty("Communications Timeout") - .isRequired(true) - .withValidator(core::StandardPropertyValidators::TIME_PERIOD_VALIDATOR) - .withDefaultValue("30 sec") - .withDescription("Sets the timeout of the communication between the AWS server and the client") - .build(); - EXTENSIONAPI static constexpr auto EndpointOverrideURL = core::PropertyDefinitionBuilder<>::createProperty("Endpoint Override URL") - .withDescription("Endpoint URL to use instead of the AWS default including scheme, host, " - "port, and path. The AWS libraries select an endpoint URL based on the AWS " - "region, but this property overrides the selected endpoint URL, allowing use " - "with other S3-compatible endpoints.") - .supportsExpressionLanguage(true) - .withValidator(core::StandardPropertyValidators::NON_BLANK_VALIDATOR) - .build(); - EXTENSIONAPI static constexpr auto ProxyHost = core::PropertyDefinitionBuilder<>::createProperty("Proxy Host") - .withDescription("Proxy host name or IP") - .supportsExpressionLanguage(true) - .build(); - EXTENSIONAPI static constexpr auto ProxyPort = core::PropertyDefinitionBuilder<>::createProperty("Proxy Port") - .withDescription("The port number of the proxy host") - .supportsExpressionLanguage(true) - .build(); - EXTENSIONAPI static constexpr auto ProxyUsername = core::PropertyDefinitionBuilder<>::createProperty("Proxy Username") - .withDescription("Username to set when authenticating against proxy") - .supportsExpressionLanguage(true) - .build(); - EXTENSIONAPI static constexpr auto ProxyPassword = core::PropertyDefinitionBuilder<>::createProperty("Proxy Password") - .withDescription("Password to set when authenticating against proxy") - .supportsExpressionLanguage(true) - .isSensitive(true) - .build(); - EXTENSIONAPI static constexpr auto UseDefaultCredentials = core::PropertyDefinitionBuilder<>::createProperty("Use Default Credentials") - .withDescription("If true, uses the Default Credential chain, including EC2 instance profiles or roles, environment variables, default user credentials, etc.") - .withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR) - .withDefaultValue("false") - .isRequired(true) - .build(); - EXTENSIONAPI static constexpr auto Properties = std::to_array<core::PropertyReference>({ - Bucket, - AccessKey, - SecretKey, - CredentialsFile, - AWSCredentialsProviderService, - Region, - CommunicationsTimeout, - EndpointOverrideURL, - ProxyHost, - ProxyPort, - ProxyUsername, - ProxyPassword, - UseDefaultCredentials - }); + EXTENSIONAPI static constexpr auto Properties = minifi::utils::array_cat(AwsProcessor::Properties, std::to_array<core::PropertyReference>({Bucket})); - explicit S3Processor(std::string_view name, const minifi::utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger); + explicit S3Processor(std::string_view name, const minifi::utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger); void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; protected: explicit S3Processor(std::string_view name, const minifi::utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger, std::unique_ptr<aws::s3::S3RequestSender> s3_request_sender); - - std::optional<Aws::Auth::AWSCredentials> getAWSCredentialsFromControllerService(core::ProcessContext& context) const; - std::optional<Aws::Auth::AWSCredentials> getAWSCredentials(core::ProcessContext& context, const core::FlowFile* const flow_file); - std::optional<aws::s3::ProxyOptions> getProxy(core::ProcessContext& context, const core::FlowFile* const flow_file); - std::optional<CommonProperties> getCommonELSupportedProperties(core::ProcessContext& context, const core::FlowFile* const flow_file); - - std::shared_ptr<core::logging::Logger> logger_; aws::s3::S3Wrapper s3_wrapper_; - std::optional<Aws::Client::ClientConfiguration> client_config_; }; } // namespace org::apache::nifi::minifi::aws::processors diff --git a/extensions/aws/s3/S3ClientRequestSender.cpp b/extensions/aws/s3/S3ClientRequestSender.cpp index dd6c3025b..6557288a7 100644 --- a/extensions/aws/s3/S3ClientRequestSender.cpp +++ b/extensions/aws/s3/S3ClientRequestSender.cpp @@ -1,6 +1,4 @@ /** - * @file S3ClientRequestSender.cpp - * S3ClientRequestSender class implementation * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with diff --git a/extensions/aws/s3/S3ClientRequestSender.h b/extensions/aws/s3/S3ClientRequestSender.h index 86bfc96e1..7da5fa2fc 100644 --- a/extensions/aws/s3/S3ClientRequestSender.h +++ b/extensions/aws/s3/S3ClientRequestSender.h @@ -1,6 +1,4 @@ /** - * @file S3Wrapper.h - * S3Wrapper class declaration * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with diff --git a/extensions/aws/s3/S3RequestSender.h b/extensions/aws/s3/S3RequestSender.h index a2cd2078c..0c92ad117 100644 --- a/extensions/aws/s3/S3RequestSender.h +++ b/extensions/aws/s3/S3RequestSender.h @@ -1,6 +1,4 @@ /** - * @file S3RequestSender.h - * S3RequestSender class declaration * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -55,13 +53,6 @@ namespace org::apache::nifi::minifi::aws::s3 { -struct ProxyOptions { - std::string host; - uint32_t port = 0; - std::string username; - std::string password; -}; - class S3RequestSender { public: virtual std::optional<Aws::S3::Model::PutObjectResult> sendPutObjectRequest( diff --git a/extensions/aws/s3/S3Wrapper.cpp b/extensions/aws/s3/S3Wrapper.cpp index aff06b72f..70972bbea 100644 --- a/extensions/aws/s3/S3Wrapper.cpp +++ b/extensions/aws/s3/S3Wrapper.cpp @@ -1,6 +1,4 @@ /** - * @file S3Wrapper.cpp - * S3Wrapper class implementation * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with diff --git a/extensions/aws/s3/S3Wrapper.h b/extensions/aws/s3/S3Wrapper.h index d943076ba..76756da09 100644 --- a/extensions/aws/s3/S3Wrapper.h +++ b/extensions/aws/s3/S3Wrapper.h @@ -1,6 +1,4 @@ /** - * @file S3Wrapper.h - * S3Wrapper class declaration * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -29,26 +27,26 @@ #include <utility> #include <vector> -#include "aws/s3/model/StorageClass.h" -#include "aws/s3/model/ServerSideEncryption.h" +#include "Exception.h" +#include "MultipartUploadStateStorage.h" +#include "S3RequestSender.h" #include "aws/s3/model/ObjectCannedACL.h" +#include "aws/s3/model/ServerSideEncryption.h" +#include "aws/s3/model/StorageClass.h" #include "aws/s3/model/ChecksumAlgorithm.h" #include "core/logging/Logger.h" #include "core/logging/LoggerFactory.h" +#include "io/InputStream.h" +#include "io/OutputStream.h" +#include "range/v3/algorithm/find.hpp" #include "utils/AWSInitializer.h" #include "utils/ConfigurationUtils.h" +#include "utils/ListingStateManager.h" #include "utils/OptionalUtils.h" #include "utils/StringUtils.h" -#include "utils/ListingStateManager.h" #include "utils/gsl.h" -#include "S3RequestSender.h" -#include "Exception.h" -#include "MultipartUploadStateStorage.h" -#include "range/v3/algorithm/find.hpp" -#include "utils/Literals.h" -#include "io/InputStream.h" -#include "io/OutputStream.h" +#include "utils/ProxyOptions.h" namespace org::apache::nifi::minifi::aws::s3 { @@ -128,7 +126,7 @@ struct RequestParameters { Aws::Auth::AWSCredentials credentials; Aws::Client::ClientConfiguration client_config; - void setClientConfig(const aws::s3::ProxyOptions& proxy, const std::string& endpoint_override_url) { + void setClientConfig(const aws::ProxyOptions& proxy, const std::string& endpoint_override_url) { client_config.proxyHost = proxy.host; client_config.proxyPort = proxy.port; client_config.proxyUserName = proxy.username; diff --git a/extensions/aws/tests/PutKinesisStreamTests.cpp b/extensions/aws/tests/PutKinesisStreamTests.cpp new file mode 100644 index 000000000..bbfda2b9c --- /dev/null +++ b/extensions/aws/tests/PutKinesisStreamTests.cpp @@ -0,0 +1,318 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <minifi-cpp/core/FlowFile.h> + +#include "aws/kinesis/model/PutRecordsRequest.h" +#include "core/Resource.h" +#include "processors/PutKinesisStream.h" +#include "unit/Catch.h" +#include "unit/SingleProcessorTestController.h" +#include "unit/TestBase.h" + +namespace org::apache::nifi::minifi::aws::processors::test { + +class MockKinesisClient final : public Aws::Kinesis::KinesisClient { + public: + enum class KinesisBehaviour { + HappyPath, + Failure, + RecordSizeMismatch, + OddsFail, + }; + + Aws::Kinesis::Model::PutRecordsOutcome PutRecords(const Aws::Kinesis::Model::PutRecordsRequest& request) const override { + switch (behaviour_) { + case KinesisBehaviour::HappyPath: { + Aws::Kinesis::Model::PutRecordsResult result; + for ([[maybe_unused]] const auto& request_entry : request.GetRecords()) { + Aws::Kinesis::Model::PutRecordsResultEntry result_entry; + result_entry.SetSequenceNumber(fmt::format("sequence_number_{}", ++sequence_number_)); + result_entry.SetShardId("shard_id"); + result.AddRecords(result_entry); + } + return result; + } + case KinesisBehaviour::Failure: { + auto err = Aws::Kinesis::KinesisError(); + err.SetResponseCode(Aws::Http::HttpResponseCode::UNAUTHORIZED); + err.SetMessage("Unauthorized"); + return err; + } + case KinesisBehaviour::RecordSizeMismatch: { + Aws::Kinesis::Model::PutRecordsResult result; + return result; + } + case KinesisBehaviour::OddsFail: { + Aws::Kinesis::Model::PutRecordsResult result; + uint8_t i = 0; + for ([[maybe_unused]] const auto& request_entry : request.GetRecords()) { + Aws::Kinesis::Model::PutRecordsResultEntry result_entry; + if (++i%2 == 0) { + result_entry.SetErrorCode("8"); + result_entry.SetErrorMessage("Some error message"); + } else { + result_entry.SetSequenceNumber(fmt::format("sequence_number_{}", ++sequence_number_)); + result_entry.SetShardId("shard_id"); + } + result.AddRecords(result_entry); + } + return result; + } + default: { throw std::invalid_argument("Unknown behaviour"); } + } + } + + KinesisBehaviour behaviour_ = KinesisBehaviour::HappyPath; + mutable uint32_t sequence_number_ = 0; +}; + +class PutKinesisStreamMocked final : public aws::processors::PutKinesisStream { + public: + static constexpr const char* Description = "PutKinesisStreamMocked"; + + explicit PutKinesisStreamMocked(const std::string& name, const minifi::utils::Identifier& uuid = minifi::utils::Identifier()) + : PutKinesisStream(name, uuid) { + } + + PutKinesisStreamMocked(const PutKinesisStreamMocked&) = delete; + PutKinesisStreamMocked(PutKinesisStreamMocked&&) = delete; + PutKinesisStreamMocked& operator=(const PutKinesisStreamMocked&) = delete; + PutKinesisStreamMocked& operator=(PutKinesisStreamMocked&&) = delete; + + ~PutKinesisStreamMocked() override = default; + + ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS + + std::unique_ptr<Aws::Kinesis::KinesisClient> getClient(const Aws::Auth::AWSCredentials&) override { + auto client = std::make_unique<MockKinesisClient>(); + client->behaviour_ = behaviour_; + return client; + } + + MockKinesisClient::KinesisBehaviour behaviour_ = MockKinesisClient::KinesisBehaviour::HappyPath; +}; +REGISTER_RESOURCE(PutKinesisStreamMocked, Processor); + +TEST_CASE("PutKinesisStream record size mismatch path") { + minifi::test::SingleProcessorTestController controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream")); + auto put_kinesis_stream = dynamic_cast<PutKinesisStreamMocked*>(controller.getProcessor()); + REQUIRE(put_kinesis_stream); + put_kinesis_stream->behaviour_ = MockKinesisClient::KinesisBehaviour::RecordSizeMismatch; + + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AccessKey, "access_key"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::SecretKey, "secret_key"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AmazonKinesisStreamName, "stream_name"); + + + const auto result = controller.trigger({{.content = "foo"}, {.content = "bar"}}); + CHECK(result.at(PutKinesisStream::Success).empty()); + REQUIRE(result.at(PutKinesisStream::Failure).size() == 2); + const auto res_ff_1 = result.at(PutKinesisStream::Failure).at(0); + const auto res_ff_2 = result.at(PutKinesisStream::Failure).at(1); + + CHECK(controller.plan->getContent(res_ff_1) == "foo"); + CHECK(controller.plan->getContent(res_ff_2) == "bar"); + + CHECK(res_ff_1->getAttribute(PutKinesisStream::AwsKinesisErrorMessage.name) == "Record size mismatch"); + CHECK(res_ff_2->getAttribute(PutKinesisStream::AwsKinesisErrorMessage.name) == "Record size mismatch"); +} + +TEST_CASE("PutKinesisStream record size failure path") { + minifi::test::SingleProcessorTestController controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream")); + auto put_kinesis_stream = dynamic_cast<PutKinesisStreamMocked*>(controller.getProcessor()); + REQUIRE(put_kinesis_stream); + put_kinesis_stream->behaviour_ = MockKinesisClient::KinesisBehaviour::Failure; + + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AccessKey, "access_key"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::SecretKey, "secret_key"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AmazonKinesisStreamName, "stream_name"); + + + const auto result = controller.trigger({{.content = "foo"}, {.content = "bar"}}); + CHECK(result.at(PutKinesisStream::Success).empty()); + REQUIRE(result.at(PutKinesisStream::Failure).size() == 2); + const auto res_ff_1 = result.at(PutKinesisStream::Failure).at(0); + const auto res_ff_2 = result.at(PutKinesisStream::Failure).at(1); + + CHECK(controller.plan->getContent(res_ff_1) == "foo"); + CHECK(controller.plan->getContent(res_ff_2) == "bar"); + + CHECK(res_ff_1->getAttribute(PutKinesisStream::AwsKinesisErrorMessage.name) == "Unauthorized"); + CHECK(res_ff_1->getAttribute(PutKinesisStream::AwsKinesisErrorCode.name) == "401"); + CHECK(res_ff_2->getAttribute(PutKinesisStream::AwsKinesisErrorCode.name) == "401"); + CHECK(res_ff_2->getAttribute(PutKinesisStream::AwsKinesisErrorMessage.name) == "Unauthorized"); +} + +TEST_CASE("PutKinesisStream partial failure path") { + minifi::test::SingleProcessorTestController controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream")); + auto put_kinesis_stream = dynamic_cast<PutKinesisStreamMocked*>(controller.getProcessor()); + REQUIRE(put_kinesis_stream); + put_kinesis_stream->behaviour_ = MockKinesisClient::KinesisBehaviour::OddsFail; + + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AccessKey, "access_key"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::SecretKey, "secret_key"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AmazonKinesisStreamName, "stream_name"); + + + const auto result = controller.trigger({{.content = "foo"}, {.content = "bar"}}); + REQUIRE(result.at(PutKinesisStream::Success).size() == 1); + REQUIRE(result.at(PutKinesisStream::Failure).size() == 1); + const auto succ_ff = result.at(PutKinesisStream::Success).at(0); + const auto fail_ff = result.at(PutKinesisStream::Failure).at(0); + + CHECK(controller.plan->getContent(succ_ff) == "foo"); + CHECK(controller.plan->getContent(fail_ff) == "bar"); + + CHECK(succ_ff->getAttribute(PutKinesisStream::AwsKinesisSequenceNumber.name) == "sequence_number_1"); + CHECK(succ_ff->getAttribute(PutKinesisStream::AwsKinesisShardId.name) == "shard_id"); + CHECK(fail_ff->getAttribute(PutKinesisStream::AwsKinesisErrorCode.name) == "8"); + CHECK(fail_ff->getAttribute(PutKinesisStream::AwsKinesisErrorMessage.name) == "Some error message"); +} + + +TEST_CASE("PutKinesisStream simple happy path") { + minifi::test::SingleProcessorTestController controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream")); + auto put_kinesis_stream = controller.getProcessor(); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AccessKey, "access_key"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::SecretKey, "secret_key"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AmazonKinesisStreamName, "stream_name"); + + + const auto result = controller.trigger({{.content = "foo"}, {.content = "bar"}}); + CHECK(result.at(PutKinesisStream::Failure).empty()); + REQUIRE(result.at(PutKinesisStream::Success).size() == 2); + const auto res_ff_1 = result.at(PutKinesisStream::Success).at(0); + const auto res_ff_2 = result.at(PutKinesisStream::Success).at(1); + + CHECK(controller.plan->getContent(res_ff_1) == "foo"); + CHECK(controller.plan->getContent(res_ff_2) == "bar"); + + CHECK(res_ff_1->getAttribute(PutKinesisStream::AwsKinesisSequenceNumber.name) == "sequence_number_1"); + CHECK(res_ff_1->getAttribute(PutKinesisStream::AwsKinesisShardId.name) == "shard_id"); + CHECK(res_ff_2->getAttribute(PutKinesisStream::AwsKinesisSequenceNumber.name) == "sequence_number_2"); + CHECK(res_ff_2->getAttribute(PutKinesisStream::AwsKinesisShardId.name) == "shard_id"); +} + +TEST_CASE("PutKinesisStream smaller batch size than available ffs") { + minifi::test::SingleProcessorTestController controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream")); + auto put_kinesis_stream = controller.getProcessor(); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AccessKey, "access_key"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::SecretKey, "secret_key"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AmazonKinesisStreamName, "stream_name"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::MessageBatchSize, "10"); + + const auto result = controller.trigger({ + {.content = "Lorem"}, + {.content = "ipsum"}, + {.content = "dolor"}, + {.content = "sit"}, + {.content = "amet"}, + {.content = "consectetur"}, + {.content = "adipiscing"}, + {.content = "elit"}, + {.content = "Morbi"}, + {.content = "dapibus"}, + {.content = "risus"}, + {.content = "a"}, + {.content = "bibendum"}, + {.content = "luctus"}}); + + CHECK(result.at(PutKinesisStream::Success).size() == 10); +} + +TEST_CASE("PutKinesisStream max batch data size fills up") { + minifi::test::SingleProcessorTestController controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream")); + auto put_kinesis_stream = controller.getProcessor(); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AccessKey, "access_key"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::SecretKey, "secret_key"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AmazonKinesisStreamName, "stream_name"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::MessageBatchSize, "10"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::MaxBatchDataSize, "12 B"); + + const auto result = controller.trigger({ + {.content = "Lorem"}, + {.content = "ipsum"}, + {.content = "dolor"}, + {.content = "sit"}, + {.content = "amet"}, + {.content = "consectetur"}, + {.content = "adipiscing"}, + {.content = "elit"}, + {.content = "Morbi"}, + {.content = "dapibus"}, + {.content = "risus"}, + {.content = "a"}, + {.content = "bibendum"}, + {.content = "luctus"}}); + + REQUIRE(result.at(PutKinesisStream::Success).size() == 3); + CHECK(controller.plan->getContent(result.at(PutKinesisStream::Success).at(0)) == "Lorem"); + CHECK(controller.plan->getContent(result.at(PutKinesisStream::Success).at(1)) == "ipsum"); + CHECK(controller.plan->getContent(result.at(PutKinesisStream::Success).at(2)) == "dolor"); +} + +TEST_CASE("PutKinesisStream max batch data size to different streams") { + minifi::test::SingleProcessorTestController controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream")); + auto put_kinesis_stream = controller.getProcessor(); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AccessKey, "access_key"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::SecretKey, "secret_key"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::MessageBatchSize, "10"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::MaxBatchDataSize, "12 B"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AmazonKinesisStreamName, "${stream_name}"); + + const auto result = controller.trigger({ + {.content = "Lorem", .attributes = {{"stream_name", "stream_one"}}}, + {.content = "ipsum", .attributes = {{"stream_name", "stream_two"}}}, + {.content = "dolor", .attributes = {{"stream_name", "stream_three"}}}, + {.content = "sit", .attributes = {{"stream_name", "stream_four"}}}, + {.content = "amet", .attributes = {{"stream_name", "stream_five"}}}, + {.content = "consectetur", .attributes = {{"stream_name", "stream_six"}}}, + {.content = "adipiscing", .attributes = {{"stream_name", "stream_seven"}}}, + {.content = "elit", .attributes = {{"stream_name", "stream_eight"}}}, + {.content = "Morbi", .attributes = {{"stream_name", "stream_nine"}}}, + {.content = "dapibus", .attributes = {{"stream_name", "stream_ten"}}}, + {.content = "risus", .attributes = {{"stream_name", "stream_eleven"}}}, + {.content = "a", .attributes = {{"stream_name", "stream_twelve"}}}, + {.content = "bibendum", .attributes = {{"stream_name", "stream_thirteen"}}}, + {.content = "luctus", .attributes = {{"stream_name", "stream_fourteen"}}}}); + + CHECK(result.at(PutKinesisStream::Success).size() == 10); +} + +TEST_CASE("PutKinesisStream with too large message") { + minifi::test::SingleProcessorTestController controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream")); + auto put_kinesis_stream = controller.getProcessor(); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AccessKey, "access_key"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::SecretKey, "secret_key"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AmazonKinesisStreamName, "stream_name"); + + std::string too_large_msg((1_MB + 10), 'x'); + const auto result = controller.trigger(too_large_msg); + REQUIRE(result.at(PutKinesisStream::Failure).size() == 1); + CHECK(result.at(PutKinesisStream::Success).empty()); + + const auto res_ff_1 = result.at(PutKinesisStream::Failure).at(0); + + CHECK(controller.plan->getContent(res_ff_1) == too_large_msg); + + CHECK(res_ff_1->getAttribute(PutKinesisStream::AwsKinesisErrorMessage.name) == "record too big 1000010, max allowed 1000000"); + CHECK(res_ff_1->getAttribute(PutKinesisStream::AwsKinesisErrorCode.name) == std::nullopt); +} + +} // namespace org::apache::nifi::minifi::aws::processors::test diff --git a/extensions/aws/utils/AWSInitializer.cpp b/extensions/aws/utils/AWSInitializer.cpp index b870e2d3a..08dcb161a 100755 --- a/extensions/aws/utils/AWSInitializer.cpp +++ b/extensions/aws/utils/AWSInitializer.cpp @@ -1,6 +1,4 @@ /** - * @file AWSInitializer.cpp - * AWSInitializer class implementation * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -28,12 +26,7 @@ #include "aws/core/platform/Environment.h" #include "AWSSdkLogger.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace aws { -namespace utils { +namespace org::apache::nifi::minifi::aws::utils { AWSInitializer& AWSInitializer::get() { static AWSInitializer instance; @@ -47,13 +40,7 @@ AWSInitializer::~AWSInitializer() { AWSInitializer::AWSInitializer() { Aws::InitAPI(options_); - Aws::Utils::Logging::InitializeAWSLogging( - std::make_shared<AWSSdkLogger>()); + Aws::Utils::Logging::InitializeAWSLogging(std::make_shared<AWSSdkLogger>()); } -} // namespace utils -} // namespace aws -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org +} // namespace org::apache::nifi::minifi::aws::utils diff --git a/extensions/aws/utils/AWSInitializer.h b/extensions/aws/utils/AWSInitializer.h index 0720b7a8e..34b3f2f38 100755 --- a/extensions/aws/utils/AWSInitializer.h +++ b/extensions/aws/utils/AWSInitializer.h @@ -1,6 +1,4 @@ /** - * @file AWSInitializer.h - * Initializing AWS SDK * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with diff --git a/extensions/aws/utils/AWSSdkLogger.cpp b/extensions/aws/utils/AWSSdkLogger.cpp index cb67e4551..1514c3f31 100644 --- a/extensions/aws/utils/AWSSdkLogger.cpp +++ b/extensions/aws/utils/AWSSdkLogger.cpp @@ -1,6 +1,4 @@ /** - * @file AWSSdkLogger.cpp - * AWSSdkLogger class implementation * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with diff --git a/extensions/aws/utils/AWSSdkLogger.h b/extensions/aws/utils/AWSSdkLogger.h index 536336eb0..af8b2529d 100644 --- a/extensions/aws/utils/AWSSdkLogger.h +++ b/extensions/aws/utils/AWSSdkLogger.h @@ -1,6 +1,4 @@ /** - * @file AWSSdkLogger.h - * AWS SDK Logger class * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with diff --git a/extensions/aws/utils/ProxyOptions.h b/extensions/aws/utils/ProxyOptions.h new file mode 100644 index 000000000..8227c93b0 --- /dev/null +++ b/extensions/aws/utils/ProxyOptions.h @@ -0,0 +1,33 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <string> +#include <cstdint> + + +namespace org::apache::nifi::minifi::aws { + +struct ProxyOptions { + std::string host; + uint32_t port = 0; + std::string username; + std::string password; +}; + +} // namespace org::apache::nifi::minifi::aws diff --git a/thirdparty/aws-sdk-cpp/dll-export-injection.patch b/thirdparty/aws-sdk-cpp/dll-export-injection.patch index 1343f00c7..b73a91788 100644 --- a/thirdparty/aws-sdk-cpp/dll-export-injection.patch +++ b/thirdparty/aws-sdk-cpp/dll-export-injection.patch @@ -1,45 +1,17 @@ -diff -rupN a/generated/src/aws-cpp-sdk-s3/CMakeLists.txt b/generated/src/aws-cpp-sdk-s3/CMakeLists.txt ---- a/generated/src/aws-cpp-sdk-s3/CMakeLists.txt 2023-12-11 13:15:50.741732410 +0100 -+++ b/generated/src/aws-cpp-sdk-s3/CMakeLists.txt 2023-12-14 14:02:41.247487265 +0100 -@@ -59,6 +59,11 @@ if(USE_WINDOWS_DLL_SEMANTICS AND BUILD_S - target_compile_definitions(${PROJECT_NAME} PRIVATE "AWS_S3_EXPORTS") - endif() - -+if(FORCE_EXPORT_S3_API) -+ target_compile_definitions(${PROJECT_NAME} PUBLIC "AWS_S3_API=__declspec(dllexport)") -+ target_compile_definitions(${PROJECT_NAME} PUBLIC "AWS_S3_EXTERN=") -+endif() -+ - target_include_directories(${PROJECT_NAME} PUBLIC - $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> - $<INSTALL_INTERFACE:include>) -diff -rupN a/generated/src/aws-cpp-sdk-s3/include/aws/s3/S3_EXPORTS.h b/generated/src/aws-cpp-sdk-s3/include/aws/s3/S3_EXPORTS.h ---- a/generated/src/aws-cpp-sdk-s3/include/aws/s3/S3_EXPORTS.h 2023-12-11 13:15:50.741732410 +0100 -+++ b/generated/src/aws-cpp-sdk-s3/include/aws/s3/S3_EXPORTS.h 2023-12-14 13:30:54.022259949 +0100 -@@ -22,10 +22,13 @@ - #define AWS_S3_API __declspec(dllimport) - #endif /* AWS_S3_EXPORTS */ - #define AWS_S3_EXTERN -- #else -- #define AWS_S3_API -- #define AWS_S3_EXTERN extern - #endif // USE_IMPORT_EXPORT -+ #ifndef AWS_S3_API -+ #define AWS_S3_API -+ #endif -+ #ifndef AWS_S3_EXTERN -+ #define AWS_S3_EXTERN -+ #endif - #else // defined (USE_WINDOWS_DLL_SEMANTICS) || defined (WIN32) - #define AWS_S3_API - #define AWS_S3_EXTERN extern -diff -rupN a/src/aws-cpp-sdk-core/CMakeLists.txt b/src/aws-cpp-sdk-core/CMakeLists.txt ---- a/src/aws-cpp-sdk-core/CMakeLists.txt 2023-12-11 13:15:52.061754319 +0100 -+++ b/src/aws-cpp-sdk-core/CMakeLists.txt 2023-12-14 14:01:42.666518935 +0100 -@@ -682,6 +682,11 @@ elseif (BUILD_SHARED_LIBS) +Subject: [PATCH] dll-export-injection +--- +Index: src/aws-cpp-sdk-core/CMakeLists.txt +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +diff --git a/src/aws-cpp-sdk-core/CMakeLists.txt b/src/aws-cpp-sdk-core/CMakeLists.txt +--- a/src/aws-cpp-sdk-core/CMakeLists.txt (revision 101ff00c3e4f5248673e853617d1731fbf844112) ++++ b/src/aws-cpp-sdk-core/CMakeLists.txt (date 1743761315080) +@@ -628,6 +628,11 @@ target_compile_definitions(${PROJECT_NAME} PRIVATE "SMITHY_EXPORTS=1") endif() - + +if(FORCE_EXPORT_CORE_API) + target_compile_definitions(${PROJECT_NAME} PUBLIC "AWS_CORE_API=__declspec(dllexport)") + target_compile_definitions(${PROJECT_NAME} PUBLIC "AWS_CORE_EXTERN=") @@ -47,10 +19,15 @@ diff -rupN a/src/aws-cpp-sdk-core/CMakeLists.txt b/src/aws-cpp-sdk-core/CMakeLis + set_compiler_flags(${PROJECT_NAME}) set_compiler_warnings(${PROJECT_NAME}) - -diff -rupN a/src/aws-cpp-sdk-core/include/aws/core/Core_EXPORTS.h b/src/aws-cpp-sdk-core/include/aws/core/Core_EXPORTS.h ---- a/src/aws-cpp-sdk-core/include/aws/core/Core_EXPORTS.h 2023-12-11 13:15:52.061754319 +0100 -+++ b/src/aws-cpp-sdk-core/include/aws/core/Core_EXPORTS.h 2023-12-14 13:31:41.699706791 +0100 + +Index: src/aws-cpp-sdk-core/include/aws/core/Core_EXPORTS.h +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +diff --git a/src/aws-cpp-sdk-core/include/aws/core/Core_EXPORTS.h b/src/aws-cpp-sdk-core/include/aws/core/Core_EXPORTS.h +--- a/src/aws-cpp-sdk-core/include/aws/core/Core_EXPORTS.h (revision 101ff00c3e4f5248673e853617d1731fbf844112) ++++ b/src/aws-cpp-sdk-core/include/aws/core/Core_EXPORTS.h (date 1743761315080) @@ -19,10 +19,13 @@ #define AWS_CORE_API __declspec(dllimport) #endif // AWS_CORE_EXPORTS @@ -68,25 +45,125 @@ diff -rupN a/src/aws-cpp-sdk-core/include/aws/core/Core_EXPORTS.h b/src/aws-cpp- #define AWS_CORE_LOCAL #else // defined (USE_WINDOWS_DLL_SEMANTICS) || defined (_WIN32) #define AWS_CORE_API -diff -rupN a/src/aws-cpp-sdk-core/include/aws/core/endpoint/DefaultEndpointProvider.h b/src/aws-cpp-sdk-core/include/aws/core/endpoint/DefaultEndpointProvider.h ---- a/src/aws-cpp-sdk-core/include/aws/core/endpoint/DefaultEndpointProvider.h 2023-12-11 13:15:52.065087708 +0100 -+++ b/src/aws-cpp-sdk-core/include/aws/core/endpoint/DefaultEndpointProvider.h 2023-12-14 10:53:36.150592052 +0100 +Index: generated/src/aws-cpp-sdk-kinesis/include/aws/kinesis/Kinesis_EXPORTS.h +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +diff --git a/generated/src/aws-cpp-sdk-kinesis/include/aws/kinesis/Kinesis_EXPORTS.h b/generated/src/aws-cpp-sdk-kinesis/include/aws/kinesis/Kinesis_EXPORTS.h +--- a/generated/src/aws-cpp-sdk-kinesis/include/aws/kinesis/Kinesis_EXPORTS.h (revision 101ff00c3e4f5248673e853617d1731fbf844112) ++++ b/generated/src/aws-cpp-sdk-kinesis/include/aws/kinesis/Kinesis_EXPORTS.h (date 1743763015621) +@@ -22,10 +22,13 @@ + #define AWS_KINESIS_API __declspec(dllimport) + #endif /* AWS_KINESIS_EXPORTS */ + #define AWS_KINESIS_EXTERN +- #else +- #define AWS_KINESIS_API +- #define AWS_KINESIS_EXTERN extern + #endif // USE_IMPORT_EXPORT ++ #ifndef AWS_KINESIS_API ++ #define AWS_KINESIS_API ++ #endif ++ #ifndef AWS_KINESIS_EXTERN ++ #define AWS_KINESIS_EXTERN ++ #endif + #else // defined (USE_WINDOWS_DLL_SEMANTICS) || defined (WIN32) + #define AWS_KINESIS_API + #define AWS_KINESIS_EXTERN extern +Index: generated/src/aws-cpp-sdk-s3/include/aws/s3/S3_EXPORTS.h +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +diff --git a/generated/src/aws-cpp-sdk-s3/include/aws/s3/S3_EXPORTS.h b/generated/src/aws-cpp-sdk-s3/include/aws/s3/S3_EXPORTS.h +--- a/generated/src/aws-cpp-sdk-s3/include/aws/s3/S3_EXPORTS.h (revision 101ff00c3e4f5248673e853617d1731fbf844112) ++++ b/generated/src/aws-cpp-sdk-s3/include/aws/s3/S3_EXPORTS.h (date 1743761315080) +@@ -22,10 +22,13 @@ + #define AWS_S3_API __declspec(dllimport) + #endif /* AWS_S3_EXPORTS */ + #define AWS_S3_EXTERN +- #else +- #define AWS_S3_API +- #define AWS_S3_EXTERN extern + #endif // USE_IMPORT_EXPORT ++ #ifndef AWS_S3_API ++ #define AWS_S3_API ++ #endif ++ #ifndef AWS_S3_EXTERN ++ #define AWS_S3_EXTERN ++ #endif + #else // defined (USE_WINDOWS_DLL_SEMANTICS) || defined (WIN32) + #define AWS_S3_API + #define AWS_S3_EXTERN extern +Index: generated/src/aws-cpp-sdk-kinesis/CMakeLists.txt +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +diff --git a/generated/src/aws-cpp-sdk-kinesis/CMakeLists.txt b/generated/src/aws-cpp-sdk-kinesis/CMakeLists.txt +--- a/generated/src/aws-cpp-sdk-kinesis/CMakeLists.txt (revision 101ff00c3e4f5248673e853617d1731fbf844112) ++++ b/generated/src/aws-cpp-sdk-kinesis/CMakeLists.txt (date 1743761496232) +@@ -59,6 +59,11 @@ + target_compile_definitions(${PROJECT_NAME} PRIVATE "AWS_KINESIS_EXPORTS") + endif() + ++if(FORCE_EXPORT_KINESIS_API) ++ target_compile_definitions(${PROJECT_NAME} PUBLIC "AWS_KINESIS_API=__declspec(dllexport)") ++ target_compile_definitions(${PROJECT_NAME} PUBLIC "AWS_KINESIS_EXTERN=") ++endif() ++ + target_include_directories(${PROJECT_NAME} PUBLIC + $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> + $<INSTALL_INTERFACE:include>) +Index: src/aws-cpp-sdk-core/include/aws/core/endpoint/DefaultEndpointProvider.h +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +diff --git a/src/aws-cpp-sdk-core/include/aws/core/endpoint/DefaultEndpointProvider.h b/src/aws-cpp-sdk-core/include/aws/core/endpoint/DefaultEndpointProvider.h +--- a/src/aws-cpp-sdk-core/include/aws/core/endpoint/DefaultEndpointProvider.h (revision 101ff00c3e4f5248673e853617d1731fbf844112) ++++ b/src/aws-cpp-sdk-core/include/aws/core/endpoint/DefaultEndpointProvider.h (date 1743761315080) @@ -6,6 +6,7 @@ - + #pragma once - + +#include <aws/core/Core_EXPORTS.h> #include <aws/core/endpoint/AWSPartitions.h> #include <aws/core/endpoint/EndpointProviderBase.h> #include <aws/core/endpoint/EndpointParameter.h> -diff -rupN a/src/aws-cpp-sdk-core/include/aws/core/endpoint/EndpointProviderBase.h b/src/aws-cpp-sdk-core/include/aws/core/endpoint/EndpointProviderBase.h ---- a/src/aws-cpp-sdk-core/include/aws/core/endpoint/EndpointProviderBase.h 2023-12-11 13:15:52.065087708 +0100 -+++ b/src/aws-cpp-sdk-core/include/aws/core/endpoint/EndpointProviderBase.h 2023-12-14 10:53:36.150592052 +0100 +Index: src/aws-cpp-sdk-core/include/aws/core/endpoint/EndpointProviderBase.h +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +diff --git a/src/aws-cpp-sdk-core/include/aws/core/endpoint/EndpointProviderBase.h b/src/aws-cpp-sdk-core/include/aws/core/endpoint/EndpointProviderBase.h +--- a/src/aws-cpp-sdk-core/include/aws/core/endpoint/EndpointProviderBase.h (revision 101ff00c3e4f5248673e853617d1731fbf844112) ++++ b/src/aws-cpp-sdk-core/include/aws/core/endpoint/EndpointProviderBase.h (date 1743761315080) @@ -6,6 +6,7 @@ - + #pragma once - + +#include <aws/core/Core_EXPORTS.h> #include <aws/core/endpoint/AWSEndpoint.h> #include <aws/core/client/AWSError.h> #include <aws/core/endpoint/EndpointParameter.h> +Index: generated/src/aws-cpp-sdk-s3/CMakeLists.txt +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +diff --git a/generated/src/aws-cpp-sdk-s3/CMakeLists.txt b/generated/src/aws-cpp-sdk-s3/CMakeLists.txt +--- a/generated/src/aws-cpp-sdk-s3/CMakeLists.txt (revision 101ff00c3e4f5248673e853617d1731fbf844112) ++++ b/generated/src/aws-cpp-sdk-s3/CMakeLists.txt (date 1743761315080) +@@ -59,6 +59,11 @@ + target_compile_definitions(${PROJECT_NAME} PRIVATE "AWS_S3_EXPORTS") + endif() + ++if(FORCE_EXPORT_S3_API) ++ target_compile_definitions(${PROJECT_NAME} PUBLIC "AWS_S3_API=__declspec(dllexport)") ++ target_compile_definitions(${PROJECT_NAME} PUBLIC "AWS_S3_EXTERN=") ++endif() ++ + target_include_directories(${PROJECT_NAME} PUBLIC + $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> + $<INSTALL_INTERFACE:include>)
