This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 4876781170eb9203ef6d499aa4d952410986d0df Author: Gabor Gyimesi <[email protected]> AuthorDate: Wed Sep 22 16:14:04 2021 +0200 MINIFICPP-1616 Add Azure DataLake storage lib to minifi - Updated Azure SDK to version 12.2.0 - Removed now unused dependencies of Azure SDK - Added PutAzureDatalakeStorage for uploading files to Azure Data Lake Gen2 storage Unfortunately Azure Data Lake Storage is not supported by the Azurite docker image, so the system testing was only done manually. Closes #1158 Signed-off-by: Marton Szasz <[email protected]> --- .github/workflows/ci.yml | 2 +- CMakeLists.txt | 3 - LICENSE | 25 -- NOTICE | 1 - PROCESSORS.md | 26 ++ README.md | 2 +- cmake/BundledAzureSdkCpp.cmake | 57 ++--- cmake/NlohmannJson.cmake | 35 --- cmake/nlohmann_json/dummy/Findnlohmann_json.cmake | 27 -- extensions/aws/processors/DeleteS3Object.cpp | 5 +- extensions/aws/processors/FetchS3Object.cpp | 5 +- extensions/aws/processors/ListS3.cpp | 6 +- extensions/aws/processors/PutS3Object.cpp | 7 +- extensions/aws/processors/S3Processor.cpp | 14 +- extensions/azure/CMakeLists.txt | 2 +- .../AzureStorageCredentialsService.h | 11 +- .../azure/processors/AzureStorageProcessorBase.cpp | 56 +++++ .../azure/processors/AzureStorageProcessorBase.h | 51 ++++ .../azure/processors/PutAzureBlobStorage.cpp | 48 +--- extensions/azure/processors/PutAzureBlobStorage.h | 38 +-- .../azure/processors/PutAzureDataLakeStorage.cpp | 177 ++++++++++++++ .../azure/processors/PutAzureDataLakeStorage.h | 103 ++++++++ extensions/azure/storage/AzureBlobStorage.cpp | 9 +- extensions/azure/storage/AzureDataLakeStorage.cpp | 54 ++++ extensions/azure/storage/AzureDataLakeStorage.h | 56 +++++ .../azure/storage/AzureDataLakeStorageClient.cpp | 55 +++++ .../azure/storage/AzureDataLakeStorageClient.h | 59 +++++ extensions/azure/storage/DataLakeStorageClient.h | 43 ++++ .../azure-tests/PutAzureDataLakeStorageTests.cpp | 271 +++++++++++++++++++++ .../azure-sdk-for-cpp-old-compiler.patch | 42 ---- .../fix-illegal-qualified-name-in-member.patch | 14 -- .../azure-sdk-cpp-remove-samples.patch | 23 ++ 32 files changed, 1042 insertions(+), 285 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c1a9769..efbf11e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -35,7 +35,7 @@ jobs: export LDFLAGS="-L/usr/local/opt/flex/lib" export CPPFLAGS="-I/usr/local/opt/flex/include" # CPPFLAGS are not recognized by cmake, so we have to force them to CFLAGS and CXXFLAGS to have flex 2.6 working - ./bootstrap.sh -e -t && cd build && cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS="${CPPFLAGS} ${CFLAGS}" -DCMAKE_CXX_FLAGS="${CPPFLAGS} ${CXXFLAGS}" -DENABLE_LUA_SCRIPTING=ON -DENABLE_SQL=ON -DUSE_REAL_ODBC_TEST_DRIVER=ON -DCMAKE_VERBOSE_MAKEFILE=ON -DCMAKE_RULE_MESSAGES=OFF -DSTRICT_GSL_CHECKS=AUDIT -DFAIL_ON_WARNINGS=ON .. && cmake --build . --parallel 4 + ./bootstrap.sh -e -t && cd build && cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS="${CPPFLAGS} ${CFLAGS}" -DCMAKE_CXX_FLAGS="${CPPFLAGS} ${CXXFLAGS}" -DENABLE_LUA_SCRIPTING=ON -DENABLE_SQL=ON -DUSE_REAL_ODBC_TEST_DRIVER=ON -DENABLE_AZURE=ON -DCMAKE_VERBOSE_MAKEFILE=ON -DCMAKE_RULE_MESSAGES=OFF -DSTRICT_GSL_CHECKS=AUDIT -DFAIL_ON_WARNINGS=ON .. && cmake --build . --parallel 4 - name: test run: cd build && make test ARGS="--timeout 300 -j4 --output-on-failure" - name: linter diff --git a/CMakeLists.txt b/CMakeLists.txt index ca0b484..88f3443 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -581,9 +581,6 @@ endif() ## Azure Extensions if (ENABLE_ALL OR ENABLE_AZURE) - include(NlohmannJson) - list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/nlohmann_json/dummy") - include(BundledAzureSdkCpp) use_bundled_libazure(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR}) createExtension(AZURE-EXTENSIONS "AZURE EXTENSIONS" "This enables Azure support" "extensions/azure" "${TEST_DIR}/azure-tests") diff --git a/LICENSE b/LICENSE index 4416258..5bcf14d 100644 --- a/LICENSE +++ b/LICENSE @@ -2897,31 +2897,6 @@ SOFTWARE -------------------------------------------------------------------------- -This product bundles 'JSON for Modern C++' which is available under a MIT license: -MIT License - -Copyright (c) 2013-2021 Niels Lohmann - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. - --------------------------------------------------------------------------- - This project bundles 'range-v3' which is available under the Boost Software License. It includes code from a number of other projects, all of which have their own BSD-like or MIT licenses. diff --git a/NOTICE b/NOTICE index 0a875f6..b8cf4a2 100644 --- a/NOTICE +++ b/NOTICE @@ -57,7 +57,6 @@ This software includes third party software subject to the following copyrights: - libsodium - Copyright (c) 2013 - 2018 Frank Denis under the ISC software license - IANA timezone database - public domain - date (HowardHinnant/date) - notices below -- JSON for Modern C++ (nlohmann/json) - Copyright (c) 2013-2021 Niels Lohmann - range-v3 - Eric Niebler and other contributors The licenses for these third party components are included in LICENSE.txt diff --git a/PROCESSORS.md b/PROCESSORS.md index 4127414..47436d0 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -42,6 +42,7 @@ - [PublishKafka](#publishkafka) - [PublishMQTT](#publishmqtt) - [PutAzureBlobStorage](#putazureblobstorage) +- [PutAzureDataLakeStorage](#putazuredatalakestorage) - [PutFile](#putfile) - [PutOPCProcessor](#putopcprocessor) - [PutS3Object](#puts3object) @@ -1238,6 +1239,31 @@ In the list below, the names of required properties appear in bold. Any other pr |success|All successfully processed FlowFiles are routed to this relationship| +## PutAzureDataLakeStorage + +### Description + +Puts content into an Azure Data Lake Storage Gen 2 +### Properties + +In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. + +| Name | Default Value | Allowable Values | Description | +| - | - | - | - | +|**Azure Storage Credentials Service**|||Name of the Azure Storage Credentials Service used to retrieve the connection string from.| +|**Filesystem Name**|||Name of the Azure Storage File System. It is assumed to be already existing.<br/>**Supports Expression Language: true**| +|Directory Name|||Name of the Azure Storage Directory. The Directory Name cannot contain a leading '/'. If left empty it designates the root directory. The directory will be created if not already existing.<br/>**Supports Expression Language: true**| +|File Name|||The filename to be uploaded. If left empty the filename attribute will be used by default.<br/>**Supports Expression Language: true**| +|**Conflict Resolution Strategy**|fail|fail<br/>replace<br/>ignore|Indicates what should happen when a file with the same name already exists in the output directory.| + +### Relationships + +| Name | Description | +| - | - | +|failure|Files that could not be written to Azure storage for some reason are transferred to this relationship| +|success|Files that have been successfully written to Azure storage are transferred to this relationship| + + ## PutFile ### Description diff --git a/README.md b/README.md index 117662c..de23895 100644 --- a/README.md +++ b/README.md @@ -77,7 +77,7 @@ Through JNI extensions you can run NiFi processors using NARs. The JNI extension | ------------- |:-------------| :-----| | Archive Extensions | [ApplyTemplate](PROCESSORS.md#applytemplate)<br/>[CompressContent](PROCESSORS.md#compresscontent)<br/>[ManipulateArchive](PROCESSORS.md#manipulatearchive)<br/>[MergeContent](PROCESSORS.md#mergecontent)<br/>[FocusArchiveEntry](PROCESSORS.md#focusarchiveentry)<br/>[UnfocusArchiveEntry](PROCESSORS.md#unfocusarchiveentry) | -DBUILD_LIBARCHIVE=ON | | AWS | [AWSCredentialsService](CONTROLLERS.md#awscredentialsservice)<br/>[PutS3Object](PROCESSORS.md#puts3object)<br/>[DeleteS3Object](PROCESSORS.md#deletes3object)<br/>[FetchS3Object](PROCESSORS.md#fetchs3object)<br/>[ListS3](PROCESSORS.md#lists3) | -DENABLE_AWS=ON | -| Azure | [AzureStorageCredentialsService](CONTROLLERS.md#azurestoragecredentialsservice)<br/>[PutAzureBlobStorage](PROCESSORS.md#putazureblobatorage) | -DENABLE_AZURE=ON | +| Azure | [AzureStorageCredentialsService](CONTROLLERS.md#azurestoragecredentialsservice)<br/>[PutAzureBlobStorage](PROCESSORS.md#putazureblobatorage)<br/>[PutAzureDataLakeStorage](#putazuredatalakestorage) | -DENABLE_AZURE=ON | | CivetWeb | [ListenHTTP](PROCESSORS.md#listenhttp) | -DDISABLE_CIVET=ON | | CURL | [InvokeHTTP](PROCESSORS.md#invokehttp) | -DDISABLE_CURL=ON | | GPS | GetGPS | -DENABLE_GPS=ON | diff --git a/cmake/BundledAzureSdkCpp.cmake b/cmake/BundledAzureSdkCpp.cmake index 939115a..05bea72 100644 --- a/cmake/BundledAzureSdkCpp.cmake +++ b/cmake/BundledAzureSdkCpp.cmake @@ -16,13 +16,9 @@ # under the License. function(use_bundled_libazure SOURCE_DIR BINARY_DIR) - set(PATCH_FILE1 "${SOURCE_DIR}/thirdparty/azure-sdk-cpp-for-cpp/azure-sdk-for-cpp-old-compiler.patch") - set(PATCH_FILE2 "${SOURCE_DIR}/thirdparty/azure-sdk-cpp-for-cpp/fix-illegal-qualified-name-in-member.patch") - set(PC ${Bash_EXECUTABLE} -c "set -x &&\ - (\"${Patch_EXECUTABLE}\" -p1 -R -s -f --dry-run -i \"${PATCH_FILE1}\" || \"${Patch_EXECUTABLE}\" -p1 -N -i \"${PATCH_FILE1}\") &&\ - (\"${Patch_EXECUTABLE}\" -p1 -R -s -f --dry-run -i \"${PATCH_FILE2}\" || \"${Patch_EXECUTABLE}\" -p1 -N -i \"${PATCH_FILE2}\") ") - - + set(PATCH_FILE "${SOURCE_DIR}/thirdparty/azure-sdk-cpp/azure-sdk-cpp-remove-samples.patch") + set(PC ${Bash_EXECUTABLE} -c "set -x && \ + (\"${Patch_EXECUTABLE}\" -p1 -R -s -f --dry-run -i \"${PATCH_FILE}\" || \"${Patch_EXECUTABLE}\" -p1 -N -i \"${PATCH_FILE}\")") # Define byproducts if (WIN32) set(SUFFIX "lib") @@ -31,6 +27,7 @@ function(use_bundled_libazure SOURCE_DIR BINARY_DIR) set(AZURE_STORAGE_COMMON_LIB "${BINARY_DIR}/thirdparty/azure-sdk-cpp-src/sdk/storage/azure-storage-common/${CMAKE_BUILD_TYPE}/${PREFIX}azure-storage-common.${SUFFIX}") set(AZURE_STORAGE_BLOBS_LIB "${BINARY_DIR}/thirdparty/azure-sdk-cpp-src/sdk/storage/azure-storage-blobs/${CMAKE_BUILD_TYPE}/${PREFIX}azure-storage-blobs.${SUFFIX}") set(AZURE_IDENTITY_LIB "${BINARY_DIR}/thirdparty/azure-sdk-cpp-src/sdk/identity/azure-identity/${CMAKE_BUILD_TYPE}/${PREFIX}azure-identity.${SUFFIX}") + set(AZURE_STORAGE_FILES_DATALAKE_LIB "${BINARY_DIR}/thirdparty/azure-sdk-cpp-src/sdk/storage/azure-storage-files-datalake/${CMAKE_BUILD_TYPE}/${PREFIX}azure-storage-files-datalake.${SUFFIX}") else() set(SUFFIX "a") set(PREFIX "lib") @@ -38,13 +35,15 @@ function(use_bundled_libazure SOURCE_DIR BINARY_DIR) set(AZURE_STORAGE_COMMON_LIB "${BINARY_DIR}/thirdparty/azure-sdk-cpp-src/sdk/storage/azure-storage-common/${PREFIX}azure-storage-common.${SUFFIX}") set(AZURE_STORAGE_BLOBS_LIB "${BINARY_DIR}/thirdparty/azure-sdk-cpp-src/sdk/storage/azure-storage-blobs/${PREFIX}azure-storage-blobs.${SUFFIX}") set(AZURE_IDENTITY_LIB "${BINARY_DIR}/thirdparty/azure-sdk-cpp-src/sdk/identity/azure-identity/${PREFIX}azure-identity.${SUFFIX}") + set(AZURE_STORAGE_FILES_DATALAKE_LIB "${BINARY_DIR}/thirdparty/azure-sdk-cpp-src/sdk/storage/azure-storage-files-datalake/${PREFIX}azure-storage-files-datalake.${SUFFIX}") endif() set(AZURESDK_LIBRARIES_LIST "${AZURE_CORE_LIB}" "${AZURE_STORAGE_COMMON_LIB}" "${AZURE_STORAGE_BLOBS_LIB}" - "${AZURE_IDENTITY_LIB}") + "${AZURE_IDENTITY_LIB}" + "${AZURE_STORAGE_FILES_DATALAKE_LIB}") set(AZURE_SDK_CMAKE_ARGS ${PASSTHROUGH_CMAKE_ARGS} -DWARNINGS_AS_ERRORS=OFF) @@ -53,8 +52,8 @@ function(use_bundled_libazure SOURCE_DIR BINARY_DIR) # Build project ExternalProject_Add( azure-sdk-cpp-external - GIT_REPOSITORY "https://github.com/Azure/azure-sdk-for-cpp.git" - GIT_TAG "azure-storage-blobs_12.0.0-beta.7" + URL https://github.com/Azure/azure-sdk-for-cpp/archive/refs/tags/azure-storage-files-datalake_12.2.0.tar.gz + URL_HASH "SHA256=d4e80ea5e786dc689ddd04825d97ab91f5e1ef2787fa88a3d5ee00f0b820433f" BUILD_IN_SOURCE true SOURCE_DIR "${BINARY_DIR}/thirdparty/azure-sdk-cpp-src" BUILD_BYPRODUCTS "${AZURESDK_LIBRARIES_LIST}" @@ -66,7 +65,7 @@ function(use_bundled_libazure SOURCE_DIR BINARY_DIR) ) # Set dependencies - add_dependencies(azure-sdk-cpp-external-build CURL::libcurl LibXml2::LibXml2 OpenSSL::Crypto OpenSSL::SSL nlohmann_json::nlohmann_json) + add_dependencies(azure-sdk-cpp-external-build CURL::libcurl LibXml2::LibXml2 OpenSSL::Crypto OpenSSL::SSL) # Set variables set(LIBAZURE_FOUND "YES" CACHE STRING "" FORCE) @@ -75,6 +74,7 @@ function(use_bundled_libazure SOURCE_DIR BINARY_DIR) "${BINARY_DIR}/thirdparty/azure-sdk-cpp-src/sdk/storage/azure-storage-blobs/inc/" "${BINARY_DIR}/thirdparty/azure-sdk-cpp-src/sdk/storage/azure-storage-common/inc/" "${BINARY_DIR}/thirdparty/azure-sdk-cpp-src/sdk/identity/azure-identity/inc/" + "${BINARY_DIR}/thirdparty/azure-sdk-cpp-src/sdk/storage/azure-storage-files-datalake/inc/" CACHE STRING "" FORCE) set(LIBAZURE_LIBRARIES ${AZURESDK_LIBRARIES_LIST} CACHE STRING "" FORCE) @@ -87,48 +87,29 @@ function(use_bundled_libazure SOURCE_DIR BINARY_DIR) set_target_properties(AZURE::azure-core PROPERTIES IMPORTED_LOCATION "${AZURE_CORE_LIB}") add_dependencies(AZURE::azure-core azure-sdk-cpp-external-build) target_include_directories(AZURE::azure-core INTERFACE ${LIBAZURE_INCLUDE_DIRS}) - target_link_libraries(AZURE::azure-core INTERFACE LibXml2::LibXml2 CURL::libcurl OpenSSL::Crypto OpenSSL::SSL Threads::Threads nlohmann_json::nlohmann_json) - if (APPLE) - target_link_libraries(AZURE::azure-core INTERFACE "-framework CoreFoundation") - endif() + target_link_libraries(AZURE::azure-core INTERFACE CURL::libcurl OpenSSL::Crypto OpenSSL::SSL) if (WIN32) - target_link_libraries(AZURE::azure-core INTERFACE winhttp.lib) + target_link_libraries(AZURE::azure-core INTERFACE winhttp.lib WebServices.lib) endif() add_library(AZURE::azure-identity STATIC IMPORTED) set_target_properties(AZURE::azure-identity PROPERTIES IMPORTED_LOCATION "${AZURE_IDENTITY_LIB}") add_dependencies(AZURE::azure-identity azure-sdk-cpp-external-build) target_include_directories(AZURE::azure-identity INTERFACE ${LIBAZURE_INCLUDE_DIRS}) - target_link_libraries(AZURE::azure-identity INTERFACE LibXml2::LibXml2 CURL::libcurl OpenSSL::Crypto OpenSSL::SSL Threads::Threads nlohmann_json::nlohmann_json) - if (APPLE) - target_link_libraries(AZURE::azure-identity INTERFACE "-framework CoreFoundation") - endif() - if (WIN32) - target_link_libraries(AZURE::azure-identity INTERFACE winhttp.lib) - endif() add_library(AZURE::azure-storage-common STATIC IMPORTED) set_target_properties(AZURE::azure-storage-common PROPERTIES IMPORTED_LOCATION "${AZURE_STORAGE_COMMON_LIB}") add_dependencies(AZURE::azure-storage-common azure-sdk-cpp-external-build) target_include_directories(AZURE::azure-storage-common INTERFACE ${LIBAZURE_INCLUDE_DIRS}) - target_link_libraries(AZURE::azure-storage-common INTERFACE LibXml2::LibXml2 CURL::libcurl OpenSSL::Crypto OpenSSL::SSL Threads::Threads nlohmann_json::nlohmann_json) - if (APPLE) - target_link_libraries(AZURE::azure-storage-common INTERFACE "-framework CoreFoundation") - endif() - if (WIN32) - target_link_libraries(AZURE::azure-storage-common INTERFACE winhttp.lib) - endif() + target_link_libraries(AZURE::azure-storage-common INTERFACE LibXml2::LibXml2) add_library(AZURE::azure-storage-blobs STATIC IMPORTED) set_target_properties(AZURE::azure-storage-blobs PROPERTIES IMPORTED_LOCATION "${AZURE_STORAGE_BLOBS_LIB}") add_dependencies(AZURE::azure-storage-blobs azure-sdk-cpp-external-build) target_include_directories(AZURE::azure-storage-blobs INTERFACE ${LIBAZURE_INCLUDE_DIRS}) - target_link_libraries(AZURE::azure-storage-blobs INTERFACE LibXml2::LibXml2 CURL::libcurl OpenSSL::Crypto OpenSSL::SSL Threads::Threads nlohmann_json::nlohmann_json) - if (APPLE) - target_link_libraries(AZURE::azure-storage-blobs INTERFACE "-framework CoreFoundation") - endif() - if (WIN32) - target_link_libraries(AZURE::azure-storage-blobs INTERFACE winhttp.lib) - endif() - add_definitions("-DBUILD_CURL_HTTP_TRANSPORT_ADAPTER") + + add_library(AZURE::azure-storage-files-datalake STATIC IMPORTED) + set_target_properties(AZURE::azure-storage-files-datalake PROPERTIES IMPORTED_LOCATION "${AZURE_STORAGE_FILES_DATALAKE_LIB}") + add_dependencies(AZURE::azure-storage-files-datalake azure-sdk-cpp-external-build) + target_include_directories(AZURE::azure-storage-files-datalake INTERFACE ${LIBAZURE_INCLUDE_DIRS}) endfunction(use_bundled_libazure) diff --git a/cmake/NlohmannJson.cmake b/cmake/NlohmannJson.cmake deleted file mode 100644 index 3b1f381..0000000 --- a/cmake/NlohmannJson.cmake +++ /dev/null @@ -1,35 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -include(FetchContent) - -FetchContent_Declare(nlohmann_json - GIT_REPOSITORY https://github.com/ArthurSonzogni/nlohmann_json_cmake_fetchcontent - GIT_TAG "v3.9.1") - -FetchContent_MakeAvailable(nlohmann_json) - -FetchContent_GetProperties(nlohmann_json) -if(NOT nlohmann_json_POPULATED) - FetchContent_Populate(nlohmann_json) - add_subdirectory(${nlohmann_json_SOURCE_DIR} ${nlohmann_json_BINARY_DIR} EXCLUDE_FROM_ALL) -endif() - -set(NLOHMANN_JSON_INCLUDE_DIR "${nlohmann_json_SOURCE_DIR}/include") - -# Set exported variables for FindPackage.cmake -set(PASSTHROUGH_VARIABLES ${PASSTHROUGH_VARIABLES} "-DEXPORTED_NLOHMANN_JSON_INCLUDE_DIR=${NLOHMANN_JSON_INCLUDE_DIR}" CACHE STRING "" FORCE) diff --git a/cmake/nlohmann_json/dummy/Findnlohmann_json.cmake b/cmake/nlohmann_json/dummy/Findnlohmann_json.cmake deleted file mode 100644 index 6a3cb1b..0000000 --- a/cmake/nlohmann_json/dummy/Findnlohmann_json.cmake +++ /dev/null @@ -1,27 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -if(NOT NLOHMANN_JSON_FOUND) - set(NLOHMANN_JSON_FOUND "YES" CACHE STRING "" FORCE) - set(NLOHMANN_JSON_INCLUDE_DIR "${EXPORTED_NLOHMANN_JSON_INCLUDE_DIR}" CACHE STRING "" FORCE) - set(NLOHMANN_JSON_INCLUDE_DIRS "${EXPORTED_NLOHMANN_JSON_INCLUDE_DIR}" CACHE STRING "" FORCE) -endif() - -if(NOT TARGET nlohmann_json::nlohmann_json) - add_library(nlohmann_json::nlohmann_json STATIC IMPORTED) - set_property(TARGET nlohmann_json::nlohmann_json APPEND PROPERTY INTERFACE_INCLUDE_DIRECTORIES "${NLOHMANN_JSON_INCLUDE_DIR}") -endif() diff --git a/extensions/aws/processors/DeleteS3Object.cpp b/extensions/aws/processors/DeleteS3Object.cpp index 2159e8a..c16a28a 100644 --- a/extensions/aws/processors/DeleteS3Object.cpp +++ b/extensions/aws/processors/DeleteS3Object.cpp @@ -48,7 +48,8 @@ const core::Relationship DeleteS3Object::Failure("failure", "FlowFiles are route void DeleteS3Object::initialize() { // Add new supported properties - updateSupportedProperties({ObjectKey, Version}); + setSupportedProperties({Bucket, AccessKey, SecretKey, CredentialsFile, CredentialsFile, AWSCredentialsProviderService, Region, CommunicationsTimeout, + EndpointOverrideURL, ProxyHost, ProxyPort, ProxyUsername, ProxyPassword, UseDefaultCredentials, ObjectKey, Version}); // Set the supported relationships setSupportedRelationships({Failure, Success}); } @@ -74,7 +75,7 @@ std::optional<aws::s3::DeleteObjectRequestParameters> DeleteS3Object::buildDelet } void DeleteS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) { - logger_->log_debug("DeleteS3Object onTrigger"); + logger_->log_trace("DeleteS3Object onTrigger"); std::shared_ptr<core::FlowFile> flow_file = session->get(); if (!flow_file) { context->yield(); diff --git a/extensions/aws/processors/FetchS3Object.cpp b/extensions/aws/processors/FetchS3Object.cpp index 8bf25aa..c7de905 100644 --- a/extensions/aws/processors/FetchS3Object.cpp +++ b/extensions/aws/processors/FetchS3Object.cpp @@ -55,7 +55,8 @@ const core::Relationship FetchS3Object::Failure("failure", "FlowFiles are routed void FetchS3Object::initialize() { // Add new supported properties - updateSupportedProperties({ObjectKey, Version, RequesterPays}); + setSupportedProperties({Bucket, AccessKey, SecretKey, CredentialsFile, CredentialsFile, AWSCredentialsProviderService, Region, CommunicationsTimeout, + EndpointOverrideURL, ProxyHost, ProxyPort, ProxyUsername, ProxyPassword, UseDefaultCredentials, ObjectKey, Version, RequesterPays}); // Set the supported relationships setSupportedRelationships({Failure, Success}); } @@ -89,7 +90,7 @@ std::optional<aws::s3::GetObjectRequestParameters> FetchS3Object::buildFetchS3Re } void FetchS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) { - logger_->log_debug("FetchS3Object onTrigger"); + logger_->log_trace("FetchS3Object onTrigger"); std::shared_ptr<core::FlowFile> flow_file = session->get(); if (!flow_file) { context->yield(); diff --git a/extensions/aws/processors/ListS3.cpp b/extensions/aws/processors/ListS3.cpp index 2ec75de..b5cc1a7 100644 --- a/extensions/aws/processors/ListS3.cpp +++ b/extensions/aws/processors/ListS3.cpp @@ -83,7 +83,9 @@ const core::Relationship ListS3::Success("success", "FlowFiles are routed to suc void ListS3::initialize() { // Add new supported properties - updateSupportedProperties({Delimiter, Prefix, UseVersions, MinimumObjectAge, WriteObjectTags, WriteUserMetadata, RequesterPays}); + setSupportedProperties({Bucket, AccessKey, SecretKey, CredentialsFile, CredentialsFile, AWSCredentialsProviderService, Region, CommunicationsTimeout, + EndpointOverrideURL, ProxyHost, ProxyPort, ProxyUsername, ProxyPassword, UseDefaultCredentials, Delimiter, Prefix, UseVersions, + MinimumObjectAge, WriteObjectTags, WriteUserMetadata, RequesterPays}); // Set the supported relationships setSupportedRelationships({Success}); } @@ -243,7 +245,7 @@ void ListS3::createNewFlowFile( } void ListS3::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) { - logger_->log_debug("ListS3 onTrigger"); + logger_->log_trace("ListS3 onTrigger"); auto aws_results = s3_wrapper_.listBucket(*list_request_params_); if (!aws_results) { diff --git a/extensions/aws/processors/PutS3Object.cpp b/extensions/aws/processors/PutS3Object.cpp index 093850b..f20d5b5 100644 --- a/extensions/aws/processors/PutS3Object.cpp +++ b/extensions/aws/processors/PutS3Object.cpp @@ -107,8 +107,9 @@ const core::Relationship PutS3Object::Failure("failure", "FlowFiles are routed t void PutS3Object::initialize() { // Add new supported properties - updateSupportedProperties({ObjectKey, ContentType, StorageClass, FullControlUserList, ReadPermissionUserList, - ReadACLUserList, WriteACLUserList, CannedACL, ServerSideEncryption}); + setSupportedProperties({Bucket, AccessKey, SecretKey, CredentialsFile, CredentialsFile, AWSCredentialsProviderService, Region, CommunicationsTimeout, + EndpointOverrideURL, ProxyHost, ProxyPort, ProxyUsername, ProxyPassword, UseDefaultCredentials, ObjectKey, ContentType, StorageClass, + FullControlUserList, ReadPermissionUserList, ReadACLUserList, WriteACLUserList, CannedACL, ServerSideEncryption}); // Set the supported relationships setSupportedRelationships({Failure, Success}); } @@ -257,7 +258,7 @@ void PutS3Object::setAttributes( } void PutS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) { - logger_->log_debug("PutS3Object onTrigger"); + logger_->log_trace("PutS3Object onTrigger"); std::shared_ptr<core::FlowFile> flow_file = session->get(); if (!flow_file) { context->yield(); diff --git a/extensions/aws/processors/S3Processor.cpp b/extensions/aws/processors/S3Processor.cpp index c720860..5bc0b27 100644 --- a/extensions/aws/processors/S3Processor.cpp +++ b/extensions/aws/processors/S3Processor.cpp @@ -115,18 +115,14 @@ const core::Property S3Processor::UseDefaultCredentials( ->build()); S3Processor::S3Processor(const std::string& name, const minifi::utils::Identifier& uuid, const std::shared_ptr<logging::Logger> &logger) - : core::Processor(name, uuid) - , logger_(logger) { - setSupportedProperties({Bucket, AccessKey, SecretKey, CredentialsFile, CredentialsFile, AWSCredentialsProviderService, Region, CommunicationsTimeout, - EndpointOverrideURL, ProxyHost, ProxyPort, ProxyUsername, ProxyPassword, UseDefaultCredentials}); + : core::Processor(name, uuid), + logger_(logger) { } S3Processor::S3Processor(const std::string& name, const minifi::utils::Identifier& uuid, const std::shared_ptr<logging::Logger> &logger, std::unique_ptr<aws::s3::S3RequestSender> s3_request_sender) - : core::Processor(name, uuid) - , logger_(logger) - , s3_wrapper_(std::move(s3_request_sender)) { - setSupportedProperties({Bucket, AccessKey, SecretKey, CredentialsFile, CredentialsFile, AWSCredentialsProviderService, Region, CommunicationsTimeout, - EndpointOverrideURL, ProxyHost, ProxyPort, ProxyUsername, ProxyPassword, UseDefaultCredentials}); + : core::Processor(name, uuid), + logger_(logger), + s3_wrapper_(std::move(s3_request_sender)) { } std::optional<Aws::Auth::AWSCredentials> S3Processor::getAWSCredentialsFromControllerService(const std::shared_ptr<core::ProcessContext> &context) const { diff --git a/extensions/azure/CMakeLists.txt b/extensions/azure/CMakeLists.txt index b8a8abc..9304743 100644 --- a/extensions/azure/CMakeLists.txt +++ b/extensions/azure/CMakeLists.txt @@ -32,7 +32,7 @@ target_include_directories(minifi-azure BEFORE PRIVATE ${CMAKE_SOURCE_DIR}/exten target_link_libraries(minifi-azure ${LIBMINIFI} Threads::Threads) target_link_libraries(minifi-azure CURL::libcurl LibXml2::LibXml2) -target_link_libraries(minifi-azure AZURE::azure-storage-blobs AZURE::azure-storage-common AZURE::azure-core) +target_link_libraries(minifi-azure AZURE::azure-storage-files-datalake AZURE::azure-storage-blobs AZURE::azure-storage-common AZURE::azure-core) if (WIN32) target_link_libraries(minifi-azure crypt32.lib bcrypt.lib) diff --git a/extensions/azure/controllerservices/AzureStorageCredentialsService.h b/extensions/azure/controllerservices/AzureStorageCredentialsService.h index 40aa843..8365c86 100644 --- a/extensions/azure/controllerservices/AzureStorageCredentialsService.h +++ b/extensions/azure/controllerservices/AzureStorageCredentialsService.h @@ -24,6 +24,7 @@ #include "core/controller/ControllerService.h" #include "core/logging/LoggerConfiguration.h" #include "storage/AzureStorageCredentials.h" +#include "utils/Export.h" namespace org { namespace apache { @@ -34,11 +35,11 @@ namespace controllers { class AzureStorageCredentialsService : public core::controller::ControllerService { public: - static const core::Property StorageAccountName; - static const core::Property StorageAccountKey; - static const core::Property SASToken; - static const core::Property CommonStorageAccountEndpointSuffix; - static const core::Property ConnectionString; + EXTENSIONAPI static const core::Property StorageAccountName; + EXTENSIONAPI static const core::Property StorageAccountKey; + EXTENSIONAPI static const core::Property SASToken; + EXTENSIONAPI static const core::Property CommonStorageAccountEndpointSuffix; + EXTENSIONAPI static const core::Property ConnectionString; explicit AzureStorageCredentialsService(const std::string& name, const minifi::utils::Identifier& uuid = {}) : ControllerService(name, uuid), diff --git a/extensions/azure/processors/AzureStorageProcessorBase.cpp b/extensions/azure/processors/AzureStorageProcessorBase.cpp new file mode 100644 index 0000000..bbbedba --- /dev/null +++ b/extensions/azure/processors/AzureStorageProcessorBase.cpp @@ -0,0 +1,56 @@ +/** + * @file AzureStorageProcessorBase.cpp + * AzureStorageProcessorBase class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "AzureStorageProcessorBase.h" + +#include <memory> +#include <string> + +#include "controllerservices/AzureStorageCredentialsService.h" + +namespace org::apache::nifi::minifi::azure::processors { + +const core::Property AzureStorageProcessorBase::AzureStorageCredentialsService( + core::PropertyBuilder::createProperty("Azure Storage Credentials Service") + ->withDescription("Name of the Azure Storage Credentials Service used to retrieve the connection string from.") + ->build()); + +std::string AzureStorageProcessorBase::getConnectionStringFromControllerService(const std::shared_ptr<core::ProcessContext> &context) const { + std::string service_name; + if (!context->getProperty(AzureStorageCredentialsService.getName(), service_name) || service_name.empty()) { + return ""; + } + + std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(service_name); + if (nullptr == service) { + logger_->log_error("Azure Storage credentials service with name: '%s' could not be found", service_name); + return ""; + } + + auto azure_credentials_service = std::dynamic_pointer_cast<minifi::azure::controllers::AzureStorageCredentialsService>(service); + if (!azure_credentials_service) { + logger_->log_error("Controller service with name: '%s' is not an Azure Storage credentials service", service_name); + return ""; + } + + return azure_credentials_service->getConnectionString(); +} + +} // namespace org::apache::nifi::minifi::azure::processors diff --git a/extensions/azure/processors/AzureStorageProcessorBase.h b/extensions/azure/processors/AzureStorageProcessorBase.h new file mode 100644 index 0000000..b2e2655 --- /dev/null +++ b/extensions/azure/processors/AzureStorageProcessorBase.h @@ -0,0 +1,51 @@ +/** + * @file AzureStorageProcessorBase.h + * AzureStorageProcessorBase class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <memory> +#include <string> + +#include "core/Property.h" +#include "core/Processor.h" +#include "core/logging/Logger.h" + +namespace org::apache::nifi::minifi::azure::processors { + +class AzureStorageProcessorBase : public core::Processor { + public: + // Supported Properties + EXTENSIONAPI static const core::Property AzureStorageCredentialsService; + + AzureStorageProcessorBase(const std::string& name, const minifi::utils::Identifier& uuid, const std::shared_ptr<logging::Logger>& logger) + : core::Processor(name, uuid), + logger_(logger) { + } + + ~AzureStorageProcessorBase() override = default; + + protected: + std::string getConnectionStringFromControllerService(const std::shared_ptr<core::ProcessContext> &context) const; + + std::mutex azure_storage_mutex_; + std::shared_ptr<logging::Logger> logger_; +}; + +} // namespace org::apache::nifi::minifi::azure::processors diff --git a/extensions/azure/processors/PutAzureBlobStorage.cpp b/extensions/azure/processors/PutAzureBlobStorage.cpp index 58ca495..243105b 100644 --- a/extensions/azure/processors/PutAzureBlobStorage.cpp +++ b/extensions/azure/processors/PutAzureBlobStorage.cpp @@ -27,12 +27,7 @@ #include "controllerservices/AzureStorageCredentialsService.h" #include "core/Resource.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace azure { -namespace processors { +namespace org::apache::nifi::minifi::azure::processors { const core::Property PutAzureBlobStorage::ContainerName( core::PropertyBuilder::createProperty("Container Name") @@ -40,10 +35,6 @@ const core::Property PutAzureBlobStorage::ContainerName( ->supportsExpressionLanguage(true) ->isRequired(true) ->build()); -const core::Property PutAzureBlobStorage::AzureStorageCredentialsService( - core::PropertyBuilder::createProperty("Azure Storage Credentials Service") - ->withDescription("Name of the Azure Storage Credentials Service used to retrieve the connection string from.") - ->build()); const core::Property PutAzureBlobStorage::StorageAccountName( core::PropertyBuilder::createProperty("Storage Account Name") ->withDescription("The storage account name.") @@ -92,13 +83,13 @@ const core::Relationship PutAzureBlobStorage::Failure("failure", "Unsuccessful o void PutAzureBlobStorage::initialize() { // Set the supported properties setSupportedProperties({ + AzureStorageCredentialsService, ContainerName, StorageAccountName, StorageAccountKey, SASToken, CommonStorageAccountEndpointSuffix, ConnectionString, - AzureStorageCredentialsService, Blob, CreateContainer }); @@ -147,31 +138,10 @@ void PutAzureBlobStorage::onSchedule(const std::shared_ptr<core::ProcessContext> logger_->log_info("Using storage account name and SAS token for authentication"); } -std::string PutAzureBlobStorage::getConnectionStringFromControllerService(const std::shared_ptr<core::ProcessContext> &context) const { - std::string service_name; - if (!context->getProperty(AzureStorageCredentialsService.getName(), service_name) || service_name.empty()) { - return ""; - } - - std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(service_name); - if (nullptr == service) { - logger_->log_error("Azure Storage credentials service with name: '%s' could not be found", service_name.c_str()); - return ""; - } - - auto azure_credentials_service = std::dynamic_pointer_cast<minifi::azure::controllers::AzureStorageCredentialsService>(service); - if (!azure_credentials_service) { - logger_->log_error("Controller service with name: '%s' is not an Azure Storage credentials service", service_name.c_str()); - return ""; - } - - return azure_credentials_service->getConnectionString(); -} - std::string PutAzureBlobStorage::getAzureConnectionStringFromProperties( const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::FlowFile> &flow_file) { - azure::storage::AzureStorageCredentials credentials; + storage::AzureStorageCredentials credentials; context->getProperty(StorageAccountName, credentials.storage_account_name, flow_file); context->getProperty(StorageAccountKey, credentials.storage_account_key, flow_file); context->getProperty(SASToken, credentials.sas_token, flow_file); @@ -203,7 +173,7 @@ std::string PutAzureBlobStorage::getConnectionString( } void PutAzureBlobStorage::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) { - logger_->log_debug("PutAzureBlobStorage onTrigger"); + logger_->log_trace("PutAzureBlobStorage onTrigger"); std::shared_ptr<core::FlowFile> flow_file = session->get(); if (!flow_file) { return; @@ -230,8 +200,9 @@ void PutAzureBlobStorage::onTrigger(const std::shared_ptr<core::ProcessContext> return; } - std::optional<azure::storage::UploadBlobResult> upload_result; + std::optional<storage::UploadBlobResult> upload_result; { + // TODO(lordgamez): This can be removed after maximum allowed threads are implemented. See https://issues.apache.org/jira/browse/MINIFICPP-1566 std::lock_guard<std::mutex> lock(azure_storage_mutex_); createAzureStorageClient(connection_string, container_name); if (create_container_) { @@ -260,9 +231,4 @@ void PutAzureBlobStorage::onTrigger(const std::shared_ptr<core::ProcessContext> REGISTER_RESOURCE(PutAzureBlobStorage, "Puts content into an Azure Storage Blob"); -} // namespace processors -} // namespace azure -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org +} // namespace org::apache::nifi::minifi::azure::processors diff --git a/extensions/azure/processors/PutAzureBlobStorage.h b/extensions/azure/processors/PutAzureBlobStorage.h index a712d0d..f779cff 100644 --- a/extensions/azure/processors/PutAzureBlobStorage.h +++ b/extensions/azure/processors/PutAzureBlobStorage.h @@ -27,27 +27,19 @@ #include <vector> #include "core/Property.h" -#include "core/Processor.h" #include "core/logging/Logger.h" #include "core/logging/LoggerConfiguration.h" #include "storage/BlobStorage.h" +#include "AzureStorageProcessorBase.h" class PutAzureBlobStorageTestsFixture; -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace azure { -namespace processors { +namespace org::apache::nifi::minifi::azure::processors { -class PutAzureBlobStorage : public core::Processor { +class PutAzureBlobStorage final : public AzureStorageProcessorBase { public: - static constexpr char const* ProcessorName = "PutAzureBlobStorage"; - // Supported Properties static const core::Property ContainerName; - static const core::Property AzureStorageCredentialsService; static const core::Property StorageAccountName; static const core::Property StorageAccountKey; static const core::Property SASToken; @@ -72,7 +64,7 @@ class PutAzureBlobStorage : public core::Processor { class ReadCallback : public InputStreamCallback { public: - ReadCallback(uint64_t flow_size, azure::storage::BlobStorage& blob_storage_wrapper, const std::string &blob_name) + ReadCallback(uint64_t flow_size, storage::BlobStorage& blob_storage_wrapper, const std::string &blob_name) : flow_size_(flow_size) , blob_storage_wrapper_(blob_storage_wrapper) , blob_name_(blob_name) { @@ -80,8 +72,8 @@ class PutAzureBlobStorage : public core::Processor { int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { std::vector<uint8_t> buffer; - int read_ret = stream->read(buffer, flow_size_); - if (read_ret < 0) { + size_t read_ret = stream->read(buffer, flow_size_); + if (io::isError(read_ret) || read_ret != flow_size_) { return -1; } @@ -92,26 +84,25 @@ class PutAzureBlobStorage : public core::Processor { return result_->length; } - std::optional<azure::storage::UploadBlobResult> getResult() const { + std::optional<storage::UploadBlobResult> getResult() const { return result_; } private: uint64_t flow_size_; - azure::storage::BlobStorage &blob_storage_wrapper_; + storage::BlobStorage &blob_storage_wrapper_; std::string blob_name_; - std::optional<azure::storage::UploadBlobResult> result_ = std::nullopt; + std::optional<storage::UploadBlobResult> result_ = std::nullopt; }; private: friend class ::PutAzureBlobStorageTestsFixture; explicit PutAzureBlobStorage(const std::string& name, const minifi::utils::Identifier& uuid, std::unique_ptr<storage::BlobStorage> blob_storage_wrapper) - : core::Processor(name, uuid) + : AzureStorageProcessorBase(name, uuid, logging::LoggerFactory<PutAzureBlobStorage>::getLogger()) , blob_storage_wrapper_(std::move(blob_storage_wrapper)) { } - std::string getConnectionStringFromControllerService(const std::shared_ptr<core::ProcessContext> &context) const; static std::string getAzureConnectionStringFromProperties( const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::FlowFile> &flow_file); @@ -120,15 +111,8 @@ class PutAzureBlobStorage : public core::Processor { const std::shared_ptr<core::FlowFile> &flow_file) const; void createAzureStorageClient(const std::string &connection_string, const std::string &container_name); - std::mutex azure_storage_mutex_; std::unique_ptr<storage::BlobStorage> blob_storage_wrapper_; bool create_container_ = false; - std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<PutAzureBlobStorage>::getLogger()}; }; -} // namespace processors -} // namespace azure -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org +} // namespace org::apache::nifi::minifi::azure::processors diff --git a/extensions/azure/processors/PutAzureDataLakeStorage.cpp b/extensions/azure/processors/PutAzureDataLakeStorage.cpp new file mode 100644 index 0000000..caff11b --- /dev/null +++ b/extensions/azure/processors/PutAzureDataLakeStorage.cpp @@ -0,0 +1,177 @@ +/** + * @file PutAzureDataLakeStorage.cpp + * PutAzureDataLakeStorage class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "PutAzureDataLakeStorage.h" + +#include "utils/ProcessorConfigUtils.h" +#include "utils/gsl.h" +#include "controllerservices/AzureStorageCredentialsService.h" +#include "core/Resource.h" + +namespace org::apache::nifi::minifi::azure::processors { + +const core::Property PutAzureDataLakeStorage::FilesystemName( + core::PropertyBuilder::createProperty("Filesystem Name") + ->withDescription("Name of the Azure Storage File System. It is assumed to be already existing.") + ->supportsExpressionLanguage(true) + ->isRequired(true) + ->build()); +const core::Property PutAzureDataLakeStorage::DirectoryName( + core::PropertyBuilder::createProperty("Directory Name") + ->withDescription("Name of the Azure Storage Directory. The Directory Name cannot contain a leading '/'. " + "If left empty it designates the root directory. The directory will be created if not already existing.") + ->supportsExpressionLanguage(true) + ->build()); +const core::Property PutAzureDataLakeStorage::FileName( + core::PropertyBuilder::createProperty("File Name") + ->withDescription("The filename to be uploaded. If left empty the filename attribute will be used by default.") + ->supportsExpressionLanguage(true) + ->build()); +const core::Property PutAzureDataLakeStorage::ConflictResolutionStrategy( + core::PropertyBuilder::createProperty("Conflict Resolution Strategy") + ->withDescription("Indicates what should happen when a file with the same name already exists in the output directory.") + ->isRequired(true) + ->withDefaultValue<std::string>(toString(FileExistsResolutionStrategy::FAIL_FLOW)) + ->withAllowableValues<std::string>(FileExistsResolutionStrategy::values()) + ->build()); + +const core::Relationship PutAzureDataLakeStorage::Success("success", "Files that have been successfully written to Azure storage are transferred to this relationship"); +const core::Relationship PutAzureDataLakeStorage::Failure("failure", "Files that could not be written to Azure storage for some reason are transferred to this relationship"); + +void PutAzureDataLakeStorage::initialize() { + // Set the supported properties + setSupportedProperties({ + AzureStorageCredentialsService, + FilesystemName, + DirectoryName, + FileName, + ConflictResolutionStrategy + }); + // Set the supported relationships + setSupportedRelationships({ + Success, + Failure + }); +} + +void PutAzureDataLakeStorage::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) { + connection_string_ = getConnectionStringFromControllerService(context); + if (connection_string_.empty()) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Azure Storage Credentials Service property missing or invalid"); + } + + conflict_resolution_strategy_ = FileExistsResolutionStrategy::parse( + utils::parsePropertyWithAllowableValuesOrThrow(*context, ConflictResolutionStrategy.getName(), FileExistsResolutionStrategy::values()).c_str()); +} + +std::optional<storage::PutAzureDataLakeStorageParameters> PutAzureDataLakeStorage::buildUploadParameters( + const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) { + storage::PutAzureDataLakeStorageParameters params; + params.connection_string = connection_string_; + params.replace_file = conflict_resolution_strategy_ == FileExistsResolutionStrategy::REPLACE_FILE; + + if (!context->getProperty(FilesystemName, params.file_system_name, flow_file) || params.file_system_name.empty()) { + logger_->log_error("Filesystem Name '%s' is invalid or empty!", params.file_system_name); + return std::nullopt; + } + + context->getProperty(DirectoryName, params.directory_name, flow_file); + + context->getProperty(FileName, params.filename, flow_file); + if (params.filename.empty() && (!flow_file->getAttribute("filename", params.filename) || params.filename.empty())) { + logger_->log_error("No File Name is set and default object key 'filename' attribute could not be found!"); + return std::nullopt; + } + + return params; +} + +void PutAzureDataLakeStorage::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) { + logger_->log_trace("PutAzureDataLakeStorage onTrigger"); + std::shared_ptr<core::FlowFile> flow_file = session->get(); + if (!flow_file) { + context->yield(); + return; + } + + const auto params = buildUploadParameters(context, flow_file); + if (!params) { + session->transfer(flow_file, Failure); + return; + } + + storage::UploadDataLakeStorageResult result; + { + // TODO(lordgamez): This can be removed after maximum allowed threads are implemented. See https://issues.apache.org/jira/browse/MINIFICPP-1566 + std::lock_guard<std::mutex> lock(azure_storage_mutex_); + PutAzureDataLakeStorage::ReadCallback callback(flow_file->getSize(), azure_data_lake_storage_, *params, logger_); + session->read(flow_file, &callback); + result = callback.getResult(); + } + + if (result.result_code == storage::UploadResultCode::FILE_ALREADY_EXISTS) { + gsl_Expects(conflict_resolution_strategy_ != FileExistsResolutionStrategy::REPLACE_FILE); + if (conflict_resolution_strategy_ == FileExistsResolutionStrategy::FAIL_FLOW) { + logger_->log_error("Failed to upload file '%s/%s' to filesystem '%s' on Azure Data Lake storage because file already exists", + params->directory_name, params->filename, params->file_system_name); + session->transfer(flow_file, Failure); + return; + } else if (conflict_resolution_strategy_ == FileExistsResolutionStrategy::IGNORE_REQUEST) { + logger_->log_debug("Upload of file '%s/%s' was ignored because it already exits in filesystem '%s' on Azure Data Lake Storage", + params->directory_name, params->filename, params->file_system_name); + session->transfer(flow_file, Success); + return; + } + } else if (result.result_code == storage::UploadResultCode::FAILURE) { + logger_->log_error("Failed to upload file '%s/%s' to filesystem '%s' on Azure Data Lake storage", params->directory_name, params->filename, params->file_system_name); + session->transfer(flow_file, Failure); + } else { + session->putAttribute(flow_file, "azure.filesystem", params->file_system_name); + session->putAttribute(flow_file, "azure.directory", params->directory_name); + session->putAttribute(flow_file, "azure.filename", params->filename); + session->putAttribute(flow_file, "azure.primaryUri", result.primary_uri); + session->putAttribute(flow_file, "azure.length", std::to_string(flow_file->getSize())); + logger_->log_debug("Successfully uploaded file '%s/%s' to filesystem '%s' on Azure Data Lake storage", params->directory_name, params->filename, params->file_system_name); + session->transfer(flow_file, Success); + } +} + +PutAzureDataLakeStorage::ReadCallback::ReadCallback( + uint64_t flow_size, storage::AzureDataLakeStorage& azure_data_lake_storage, const storage::PutAzureDataLakeStorageParameters& params, std::shared_ptr<logging::Logger> logger) + : flow_size_(flow_size), + azure_data_lake_storage_(azure_data_lake_storage), + params_(params), + logger_(std::move(logger)) { +} + +int64_t PutAzureDataLakeStorage::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) { + std::vector<uint8_t> buffer; + size_t read_ret = stream->read(buffer, flow_size_); + if (io::isError(read_ret) || read_ret != flow_size_) { + return -1; + } + + result_ = azure_data_lake_storage_.uploadFile(params_, gsl::span<const uint8_t>{buffer.data(), flow_size_}); + return read_ret; +} + +REGISTER_RESOURCE(PutAzureDataLakeStorage, "Puts content into an Azure Data Lake Storage Gen 2"); + +} // namespace org::apache::nifi::minifi::azure::processors diff --git a/extensions/azure/processors/PutAzureDataLakeStorage.h b/extensions/azure/processors/PutAzureDataLakeStorage.h new file mode 100644 index 0000000..a8fb1f3 --- /dev/null +++ b/extensions/azure/processors/PutAzureDataLakeStorage.h @@ -0,0 +1,103 @@ +/** + * @file PutAzureDataLakeStorage.h + * PutAzureDataLakeStorage class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <utility> +#include <string> +#include <memory> +#include <optional> +#include <vector> + +#include "core/Property.h" +#include "core/logging/Logger.h" +#include "core/logging/LoggerConfiguration.h" +#include "storage/AzureDataLakeStorage.h" +#include "utils/Enum.h" +#include "utils/Export.h" +#include "AzureStorageProcessorBase.h" + +class PutAzureDataLakeStorageTestsFixture; + +namespace org::apache::nifi::minifi::azure::processors { + +class PutAzureDataLakeStorage final : public AzureStorageProcessorBase { + public: + // Supported Properties + EXTENSIONAPI static const core::Property FilesystemName; + EXTENSIONAPI static const core::Property DirectoryName; + EXTENSIONAPI static const core::Property FileName; + EXTENSIONAPI static const core::Property ConflictResolutionStrategy; + + // Supported Relationships + EXTENSIONAPI static const core::Relationship Failure; + EXTENSIONAPI static const core::Relationship Success; + + SMART_ENUM(FileExistsResolutionStrategy, + (FAIL_FLOW, "fail"), + (REPLACE_FILE, "replace"), + (IGNORE_REQUEST, "ignore") + ) + + explicit PutAzureDataLakeStorage(const std::string& name, const minifi::utils::Identifier& uuid = minifi::utils::Identifier()) + : PutAzureDataLakeStorage(name, uuid, nullptr) { + } + + void initialize() override; + void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override; + void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override; + + private: + friend class ::PutAzureDataLakeStorageTestsFixture; + + class ReadCallback : public InputStreamCallback { + public: + ReadCallback(uint64_t flow_size, storage::AzureDataLakeStorage& azure_data_lake_storage, const storage::PutAzureDataLakeStorageParameters& params, std::shared_ptr<logging::Logger> logger); + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override; + + storage::UploadDataLakeStorageResult getResult() const { + return result_; + } + + private: + uint64_t flow_size_; + storage::AzureDataLakeStorage& azure_data_lake_storage_; + const storage::PutAzureDataLakeStorageParameters& params_; + storage::UploadDataLakeStorageResult result_; + std::shared_ptr<logging::Logger> logger_; + }; + + core::annotation::Input getInputRequirement() const override { + return core::annotation::Input::INPUT_REQUIRED; + } + + explicit PutAzureDataLakeStorage(const std::string& name, const minifi::utils::Identifier& uuid, std::unique_ptr<storage::DataLakeStorageClient> data_lake_storage_client) + : AzureStorageProcessorBase(name, uuid, logging::LoggerFactory<PutAzureDataLakeStorage>::getLogger()), + azure_data_lake_storage_(std::move(data_lake_storage_client)) { + } + + std::optional<storage::PutAzureDataLakeStorageParameters> buildUploadParameters(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file); + + std::string connection_string_; + FileExistsResolutionStrategy conflict_resolution_strategy_; + storage::AzureDataLakeStorage azure_data_lake_storage_; +}; + +} // namespace org::apache::nifi::minifi::azure::processors diff --git a/extensions/azure/storage/AzureBlobStorage.cpp b/extensions/azure/storage/AzureBlobStorage.cpp index 8be6729..5d3fb46 100644 --- a/extensions/azure/storage/AzureBlobStorage.cpp +++ b/extensions/azure/storage/AzureBlobStorage.cpp @@ -60,17 +60,14 @@ std::optional<UploadBlobResult> AzureBlobStorage::uploadBlob(const std::string & try { auto blob_client = container_client_->GetBlockBlobClient(blob_name); auto response = blob_client.UploadFrom(buffer, buffer_size); - if (!response.HasValue()) { - return std::nullopt; - } UploadBlobResult result; result.length = buffer_size; result.primary_uri = container_client_->GetUrl(); - if (response->ETag.HasValue()) { - result.etag = response->ETag.ToString(); + if (response.Value.ETag.HasValue()) { + result.etag = response.Value.ETag.ToString(); } - result.timestamp = response->LastModified.GetString(Azure::Core::DateTime::DateFormat::Rfc1123); + result.timestamp = response.Value.LastModified.ToString(Azure::DateTime::DateFormat::Rfc1123); return result; } catch (const std::runtime_error& err) { logger_->log_error("A runtime error occurred while uploading blob: %s", err.what()); diff --git a/extensions/azure/storage/AzureDataLakeStorage.cpp b/extensions/azure/storage/AzureDataLakeStorage.cpp new file mode 100644 index 0000000..82e3d5a --- /dev/null +++ b/extensions/azure/storage/AzureDataLakeStorage.cpp @@ -0,0 +1,54 @@ +/** + * @file AzureDataLakeStorage.cpp + * AzureDataLakeStorage class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "AzureDataLakeStorage.h" + +#include "AzureDataLakeStorageClient.h" + +namespace org::apache::nifi::minifi::azure::storage { + +AzureDataLakeStorage::AzureDataLakeStorage(std::unique_ptr<DataLakeStorageClient> data_lake_storage_client) + : data_lake_storage_client_(data_lake_storage_client ? std::move(data_lake_storage_client) : std::make_unique<AzureDataLakeStorageClient>()) { +} + +UploadDataLakeStorageResult AzureDataLakeStorage::uploadFile(const PutAzureDataLakeStorageParameters& params, gsl::span<const uint8_t> buffer) { + UploadDataLakeStorageResult result; + try { + auto file_created = data_lake_storage_client_->createFile(params); + if (!file_created && !params.replace_file) { + logger_->log_warn("File '%s/%s' already exists on Azure Data Lake Storage filesystem '%s'", params.directory_name, params.filename, params.file_system_name); + result.result_code = UploadResultCode::FILE_ALREADY_EXISTS; + return result; + } + + auto upload_url = data_lake_storage_client_->uploadFile(params, buffer); + if (auto query_string_pos = upload_url.find('?'); query_string_pos != std::string::npos) { + upload_url = upload_url.substr(0, query_string_pos); + } + result.primary_uri = upload_url; + return result; + } catch(const std::exception& ex) { + logger_->log_error("An exception occurred while uploading file to Azure Data Lake storage: %s", ex.what()); + result.result_code = UploadResultCode::FAILURE; + return result; + } +} + +} // namespace org::apache::nifi::minifi::azure::storage diff --git a/extensions/azure/storage/AzureDataLakeStorage.h b/extensions/azure/storage/AzureDataLakeStorage.h new file mode 100644 index 0000000..f4b67e7 --- /dev/null +++ b/extensions/azure/storage/AzureDataLakeStorage.h @@ -0,0 +1,56 @@ +/** + * @file AzureDataLakeStorage.h + * AzureDataLakeStorage class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <string> +#include <vector> +#include <memory> +#include <optional> +#include <utility> + +#include "core/logging/Logger.h" +#include "core/logging/LoggerConfiguration.h" +#include "DataLakeStorageClient.h" + +namespace org::apache::nifi::minifi::azure::storage { + +enum class UploadResultCode { + SUCCESS, + FILE_ALREADY_EXISTS, + FAILURE +}; + +struct UploadDataLakeStorageResult { + UploadResultCode result_code = UploadResultCode::SUCCESS; + std::string primary_uri; +}; + +class AzureDataLakeStorage { + public: + explicit AzureDataLakeStorage(std::unique_ptr<DataLakeStorageClient> data_lake_storage_client = nullptr); + + storage::UploadDataLakeStorageResult uploadFile(const storage::PutAzureDataLakeStorageParameters& params, gsl::span<const uint8_t> buffer); + + private: + std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<AzureDataLakeStorage>::getLogger()}; + std::unique_ptr<DataLakeStorageClient> data_lake_storage_client_; +}; + +} // namespace org::apache::nifi::minifi::azure::storage diff --git a/extensions/azure/storage/AzureDataLakeStorageClient.cpp b/extensions/azure/storage/AzureDataLakeStorageClient.cpp new file mode 100644 index 0000000..8c9f2a2 --- /dev/null +++ b/extensions/azure/storage/AzureDataLakeStorageClient.cpp @@ -0,0 +1,55 @@ +/** + * @file AzureDataLakeStorageClient.cpp + * AzureDataLakeStorageClient class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "AzureDataLakeStorageClient.h" + +namespace org::apache::nifi::minifi::azure::storage { + +void AzureDataLakeStorageClient::resetClientIfNeeded(const std::string& connection_string, const std::string& file_system_name) { + if (client_ == nullptr || connection_string_ != connection_string || file_system_name_ != file_system_name) { + client_ = std::make_unique<Azure::Storage::Files::DataLake::DataLakeFileSystemClient>( + Azure::Storage::Files::DataLake::DataLakeFileSystemClient::CreateFromConnectionString(connection_string, file_system_name)); + file_system_name_ = file_system_name; + connection_string_ = connection_string; + } +} + +Azure::Storage::Files::DataLake::DataLakeFileClient AzureDataLakeStorageClient::getFileClient(const PutAzureDataLakeStorageParameters& params) { + resetClientIfNeeded(params.connection_string, params.file_system_name); + auto directory_client = client_->GetDirectoryClient(params.directory_name); + if (!params.directory_name.empty()) { + directory_client.CreateIfNotExists(); + } + return directory_client.GetFileClient(params.filename); +} + +bool AzureDataLakeStorageClient::createFile(const PutAzureDataLakeStorageParameters& params) { + auto file_client = getFileClient(params); + auto response = file_client.CreateIfNotExists(); + return response.Value.Created; +} + +std::string AzureDataLakeStorageClient::uploadFile(const PutAzureDataLakeStorageParameters& params, gsl::span<const uint8_t> buffer) { + auto file_client = getFileClient(params); + file_client.UploadFrom(buffer.data(), buffer.size()); + return file_client.GetUrl(); +} + +} // namespace org::apache::nifi::minifi::azure::storage diff --git a/extensions/azure/storage/AzureDataLakeStorageClient.h b/extensions/azure/storage/AzureDataLakeStorageClient.h new file mode 100644 index 0000000..02b9362 --- /dev/null +++ b/extensions/azure/storage/AzureDataLakeStorageClient.h @@ -0,0 +1,59 @@ +/** + * @file AzureDataLakeStorageClient.h + * AzureDataLakeStorageClient class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <string> +#include <memory> + +#include <azure/storage/files/datalake.hpp> + +#include "DataLakeStorageClient.h" +#include "core/logging/Logger.h" +#include "core/logging/LoggerConfiguration.h" + +namespace org::apache::nifi::minifi::azure::storage { + +class AzureDataLakeStorageClient : public DataLakeStorageClient { + public: + /** + * Creates a file on Azure Data Lake Storage + * @param params Parameters required for connecting and file creation on Azure + * @return True if a new file was created, false otherwise + */ + bool createFile(const PutAzureDataLakeStorageParameters& params) override; + + /** + * Creates a file on the Azure Data Lake Storage + * @param params Parameters required for connecting and file access on Azure + * @param buffer Buffer containing the data to be uploaded + * @return URI of the file uploaded + */ + std::string uploadFile(const PutAzureDataLakeStorageParameters& params, gsl::span<const uint8_t> buffer) override; + + private: + void resetClientIfNeeded(const std::string& connection_string, const std::string& file_system_name); + Azure::Storage::Files::DataLake::DataLakeFileClient getFileClient(const PutAzureDataLakeStorageParameters& params); + + std::string connection_string_; + std::string file_system_name_; + std::unique_ptr<Azure::Storage::Files::DataLake::DataLakeFileSystemClient> client_; +}; + +} // namespace org::apache::nifi::minifi::azure::storage diff --git a/extensions/azure/storage/DataLakeStorageClient.h b/extensions/azure/storage/DataLakeStorageClient.h new file mode 100644 index 0000000..f564426 --- /dev/null +++ b/extensions/azure/storage/DataLakeStorageClient.h @@ -0,0 +1,43 @@ +/** + * @file DataLakeStorageClient.h + * DataLakeStorageClient class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <string> + +#include "gsl/gsl-lite.hpp" + +namespace org::apache::nifi::minifi::azure::storage { + +struct PutAzureDataLakeStorageParameters { + std::string connection_string; + std::string file_system_name; + std::string directory_name; + std::string filename; + bool replace_file = false; +}; + +class DataLakeStorageClient { + public: + virtual bool createFile(const PutAzureDataLakeStorageParameters& params) = 0; + virtual std::string uploadFile(const PutAzureDataLakeStorageParameters& params, gsl::span<const uint8_t> buffer) = 0; + virtual ~DataLakeStorageClient() {} +}; + +} // namespace org::apache::nifi::minifi::azure::storage diff --git a/libminifi/test/azure-tests/PutAzureDataLakeStorageTests.cpp b/libminifi/test/azure-tests/PutAzureDataLakeStorageTests.cpp new file mode 100644 index 0000000..2b56897 --- /dev/null +++ b/libminifi/test/azure-tests/PutAzureDataLakeStorageTests.cpp @@ -0,0 +1,271 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../TestBase.h" +#include "utils/IntegrationTestUtils.h" +#include "utils/TestUtils.h" +#include "core/Processor.h" +#include "processors/PutAzureDataLakeStorage.h" +#include "processors/GetFile.h" +#include "processors/PutFile.h" +#include "processors/LogAttribute.h" +#include "processors/UpdateAttribute.h" +#include "storage/DataLakeStorageClient.h" +#include "utils/file/FileUtils.h" +#include "controllerservices/AzureStorageCredentialsService.h" + +using namespace std::chrono_literals; + +const std::string FILESYSTEM_NAME = "testfilesystem"; +const std::string DIRECTORY_NAME = "testdir"; +const std::string FILE_NAME = "testfile.txt"; +const std::string CONNECTION_STRING = "test-connectionstring"; +const std::string TEST_DATA = "data123"; +const std::string GETFILE_FILE_NAME = "input_data.log"; + +class MockDataLakeStorageClient : public minifi::azure::storage::DataLakeStorageClient { + public: + const std::string PRIMARY_URI = "http://test-uri/file"; + + bool createFile(const minifi::azure::storage::PutAzureDataLakeStorageParameters& /*params*/) override { + if (file_creation_error_) { + throw std::runtime_error("error"); + } + return create_file_; + } + + std::string uploadFile(const minifi::azure::storage::PutAzureDataLakeStorageParameters& params, gsl::span<const uint8_t> buffer) override { + input_data_ = std::string(buffer.begin(), buffer.end()); + params_ = params; + + if (upload_fails_) { + throw std::runtime_error("error"); + } + + return RETURNED_PRIMARY_URI; + } + + void setFileCreation(bool create_file) { + create_file_ = create_file; + } + + void setFileCreationError(bool file_creation_error) { + file_creation_error_ = file_creation_error; + } + + void setUploadFailure(bool upload_fails) { + upload_fails_ = upload_fails; + } + + minifi::azure::storage::PutAzureDataLakeStorageParameters getPassedParams() const { + return params_; + } + + private: + const std::string RETURNED_PRIMARY_URI = "http://test-uri/file?secret-sas"; + bool create_file_ = true; + bool file_creation_error_ = false; + bool upload_fails_ = false; + std::string input_data_; + minifi::azure::storage::PutAzureDataLakeStorageParameters params_; +}; + +class PutAzureDataLakeStorageTestsFixture { + public: + PutAzureDataLakeStorageTestsFixture() { + LogTestController::getInstance().setDebug<TestPlan>(); + LogTestController::getInstance().setDebug<minifi::core::Processor>(); + LogTestController::getInstance().setTrace<minifi::core::ProcessSession>(); + LogTestController::getInstance().setTrace<processors::GetFile>(); + LogTestController::getInstance().setTrace<processors::PutFile>(); + LogTestController::getInstance().setDebug<processors::UpdateAttribute>(); + LogTestController::getInstance().setDebug<processors::LogAttribute>(); + LogTestController::getInstance().setTrace<minifi::azure::processors::PutAzureDataLakeStorage>(); + + // Build MiNiFi processing graph + plan_ = test_controller_.createPlan(); + auto mock_data_lake_storage_client = std::make_unique<MockDataLakeStorageClient>(); + mock_data_lake_storage_client_ptr_ = mock_data_lake_storage_client.get(); + put_azure_data_lake_storage_ = std::shared_ptr<minifi::azure::processors::PutAzureDataLakeStorage>( + new minifi::azure::processors::PutAzureDataLakeStorage("PutAzureDataLakeStorage", utils::Identifier(), std::move(mock_data_lake_storage_client))); + auto input_dir = test_controller_.createTempDirectory(); + utils::putFileToDir(input_dir, GETFILE_FILE_NAME, TEST_DATA); + + get_file_ = plan_->addProcessor("GetFile", "GetFile"); + plan_->setProperty(get_file_, processors::GetFile::Directory.getName(), input_dir); + plan_->setProperty(get_file_, processors::GetFile::KeepSourceFile.getName(), "false"); + + update_attribute_ = plan_->addProcessor("UpdateAttribute", "UpdateAttribute", { {"success", "d"} }, true); + plan_->addProcessor(put_azure_data_lake_storage_, "PutAzureDataLakeStorage", { {"success", "d"}, {"failure", "d"} }, true); + auto logattribute = plan_->addProcessor("LogAttribute", "LogAttribute", { {"success", "d"} }, true); + logattribute->setAutoTerminatedRelationships({{"success", "d"}}); + + putfile_ = plan_->addProcessor("PutFile", "PutFile", { {"success", "d"} }, false); + plan_->addConnection(put_azure_data_lake_storage_, {"failure", "d"}, putfile_); + putfile_->setAutoTerminatedRelationships({{"success", "d"}, {"failure", "d"}}); + output_dir_ = test_controller_.createTempDirectory(); + plan_->setProperty(putfile_, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), output_dir_); + + azure_storage_cred_service_ = plan_->addController("AzureStorageCredentialsService", "AzureStorageCredentialsService"); + setDefaultProperties(); + } + + std::vector<std::string> getFailedFlowFileContents() { + std::vector<std::string> file_contents; + + auto lambda = [&file_contents](const std::string& path, const std::string& filename) -> bool { + std::ifstream is(path + utils::file::FileUtils::get_separator() + filename, std::ifstream::binary); + std::string file_content((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()); + file_contents.push_back(file_content); + return true; + }; + + utils::file::FileUtils::list_dir(output_dir_, lambda, plan_->getLogger(), false); + return file_contents; + } + + void setDefaultProperties() { + plan_->setProperty(put_azure_data_lake_storage_, minifi::azure::processors::PutAzureDataLakeStorage::AzureStorageCredentialsService.getName(), "AzureStorageCredentialsService"); + plan_->setProperty(update_attribute_, "test.filesystemname", FILESYSTEM_NAME, true); + plan_->setProperty(put_azure_data_lake_storage_, minifi::azure::processors::PutAzureDataLakeStorage::FilesystemName.getName(), "${test.filesystemname}"); + plan_->setProperty(update_attribute_, "test.directoryname", DIRECTORY_NAME, true); + plan_->setProperty(put_azure_data_lake_storage_, minifi::azure::processors::PutAzureDataLakeStorage::DirectoryName.getName(), "${test.directoryname}"); + plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), CONNECTION_STRING); + } + + virtual ~PutAzureDataLakeStorageTestsFixture() { + LogTestController::getInstance().reset(); + } + + protected: + TestController test_controller_; + std::shared_ptr<TestPlan> plan_; + MockDataLakeStorageClient* mock_data_lake_storage_client_ptr_; + std::shared_ptr<core::Processor> put_azure_data_lake_storage_; + std::shared_ptr<core::Processor> get_file_; + std::shared_ptr<core::Processor> update_attribute_; + std::shared_ptr<core::Processor> putfile_; + std::shared_ptr<core::controller::ControllerServiceNode> azure_storage_cred_service_; + std::string output_dir_; +}; + +TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Azure storage credentials service is empty", "[azureDataLakeStorageParameters]") { + plan_->setProperty(put_azure_data_lake_storage_, minifi::azure::processors::PutAzureDataLakeStorage::AzureStorageCredentialsService.getName(), ""); + REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception); + REQUIRE(getFailedFlowFileContents().size() == 0); +} + +TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Filesystem name is not set", "[azureDataLakeStorageParameters]") { + plan_->setProperty(update_attribute_, "test.filesystemname", "", true); + test_controller_.runSession(plan_, true); + using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime; + REQUIRE(verifyLogLinePresenceInPollTime(1s, "Filesystem Name '' is invalid or empty!")); + auto failed_flowfiles = getFailedFlowFileContents(); + REQUIRE(failed_flowfiles.size() == 1); + REQUIRE(failed_flowfiles[0] == TEST_DATA); +} + +TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Connection String is empty", "[azureDataLakeStorageParameters]") { + plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), ""); + REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception); + REQUIRE(getFailedFlowFileContents().size() == 0); +} + +TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Upload to Azure Data Lake Storage with default parameters", "[azureDataLakeStorageUpload]") { + test_controller_.runSession(plan_, true); + REQUIRE(getFailedFlowFileContents().size() == 0); + using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime; + REQUIRE(verifyLogLinePresenceInPollTime(1s, "key:azure.directory value:" + DIRECTORY_NAME)); + REQUIRE(verifyLogLinePresenceInPollTime(1s, "key:azure.filename value:" + GETFILE_FILE_NAME)); + REQUIRE(verifyLogLinePresenceInPollTime(1s, "key:azure.filesystem value:" + FILESYSTEM_NAME)); + REQUIRE(verifyLogLinePresenceInPollTime(1s, "key:azure.length value:" + std::to_string(TEST_DATA.size()))); + REQUIRE(verifyLogLinePresenceInPollTime(1s, "key:azure.primaryUri value:" + mock_data_lake_storage_client_ptr_->PRIMARY_URI + "\n")); + auto passed_params = mock_data_lake_storage_client_ptr_->getPassedParams(); + REQUIRE(passed_params.connection_string == CONNECTION_STRING); + REQUIRE(passed_params.file_system_name == FILESYSTEM_NAME); + REQUIRE(passed_params.directory_name == DIRECTORY_NAME); + REQUIRE(passed_params.filename == GETFILE_FILE_NAME); + REQUIRE_FALSE(passed_params.replace_file); +} + +TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "File creation fails", "[azureDataLakeStorageUpload]") { + mock_data_lake_storage_client_ptr_->setFileCreationError(true); + test_controller_.runSession(plan_, true); + auto failed_flowfiles = getFailedFlowFileContents(); + REQUIRE(failed_flowfiles.size() == 1); + REQUIRE(failed_flowfiles[0] == TEST_DATA); +} + +TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "File upload fails", "[azureDataLakeStorageUpload]") { + mock_data_lake_storage_client_ptr_->setUploadFailure(true); + test_controller_.runSession(plan_, true); + auto failed_flowfiles = getFailedFlowFileContents(); + REQUIRE(failed_flowfiles.size() == 1); + REQUIRE(failed_flowfiles[0] == TEST_DATA); +} + +TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Transfer to failure on 'fail' resolution strategy if file exists", "[azureDataLakeStorageUpload]") { + mock_data_lake_storage_client_ptr_->setFileCreation(false); + test_controller_.runSession(plan_, true); + auto failed_flowfiles = getFailedFlowFileContents(); + REQUIRE(failed_flowfiles.size() == 1); + REQUIRE(failed_flowfiles[0] == TEST_DATA); +} + +TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Transfer to success on 'ignore' resolution strategy if file exists", "[azureDataLakeStorageUpload]") { + plan_->setProperty(put_azure_data_lake_storage_, + minifi::azure::processors::PutAzureDataLakeStorage::ConflictResolutionStrategy.getName(), + toString(minifi::azure::processors::PutAzureDataLakeStorage::FileExistsResolutionStrategy::IGNORE_REQUEST)); + mock_data_lake_storage_client_ptr_->setFileCreation(false); + test_controller_.runSession(plan_, true); + REQUIRE(getFailedFlowFileContents().size() == 0); + using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime; + REQUIRE(verifyLogLinePresenceInPollTime(1s, "key:filename value:" + GETFILE_FILE_NAME)); + REQUIRE_FALSE(LogTestController::getInstance().contains("key:azure", 0s, 0ms)); +} + +TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Replace old file on 'replace' resolution strategy if file exists", "[azureDataLakeStorageUpload]") { + plan_->setProperty(put_azure_data_lake_storage_, + minifi::azure::processors::PutAzureDataLakeStorage::ConflictResolutionStrategy.getName(), + toString(minifi::azure::processors::PutAzureDataLakeStorage::FileExistsResolutionStrategy::REPLACE_FILE)); + mock_data_lake_storage_client_ptr_->setFileCreation(false); + test_controller_.runSession(plan_, true); + REQUIRE(getFailedFlowFileContents().size() == 0); + using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime; + REQUIRE(verifyLogLinePresenceInPollTime(1s, "key:azure.directory value:" + DIRECTORY_NAME)); + REQUIRE(verifyLogLinePresenceInPollTime(1s, "key:azure.filename value:" + GETFILE_FILE_NAME)); + REQUIRE(verifyLogLinePresenceInPollTime(1s, "key:azure.filesystem value:" + FILESYSTEM_NAME)); + REQUIRE(verifyLogLinePresenceInPollTime(1s, "key:azure.length value:" + std::to_string(TEST_DATA.size()))); + REQUIRE(verifyLogLinePresenceInPollTime(1s, "key:azure.primaryUri value:" + mock_data_lake_storage_client_ptr_->PRIMARY_URI + "\n")); + auto passed_params = mock_data_lake_storage_client_ptr_->getPassedParams(); + REQUIRE(passed_params.connection_string == CONNECTION_STRING); + REQUIRE(passed_params.file_system_name == FILESYSTEM_NAME); + REQUIRE(passed_params.directory_name == DIRECTORY_NAME); + REQUIRE(passed_params.filename == GETFILE_FILE_NAME); + REQUIRE(passed_params.replace_file); +} + +TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Upload to Azure Data Lake Storage with empty directory is accepted", "[azureDataLakeStorageUpload]") { + plan_->setProperty(put_azure_data_lake_storage_, minifi::azure::processors::PutAzureDataLakeStorage::DirectoryName.getName(), ""); + test_controller_.runSession(plan_, true); + REQUIRE(getFailedFlowFileContents().size() == 0); + using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime; + REQUIRE(verifyLogLinePresenceInPollTime(1s, "key:azure.directory value:\n")); + auto passed_params = mock_data_lake_storage_client_ptr_->getPassedParams(); + REQUIRE(passed_params.directory_name == ""); +} diff --git a/thirdparty/azure-sdk-cpp-for-cpp/azure-sdk-for-cpp-old-compiler.patch b/thirdparty/azure-sdk-cpp-for-cpp/azure-sdk-for-cpp-old-compiler.patch deleted file mode 100644 index c54af17..0000000 --- a/thirdparty/azure-sdk-cpp-for-cpp/azure-sdk-for-cpp-old-compiler.patch +++ /dev/null @@ -1,42 +0,0 @@ -diff -rupN orig/sdk/core/azure-core/inc/azure/core/context.hpp patched/sdk/core/azure-core/inc/azure/core/context.hpp ---- orig/sdk/core/azure-core/inc/azure/core/context.hpp 2021-02-03 09:06:18.580502882 +0000 -+++ patched/sdk/core/azure-core/inc/azure/core/context.hpp 2021-02-03 09:07:11.302899054 +0000 -@@ -255,7 +255,7 @@ namespace Azure { namespace Core { - struct ContextSharedState - { - std::shared_ptr<ContextSharedState> Parent; -- std::atomic_int64_t CancelAtMsecSinceEpoch; -+ std::atomic<int64_t> CancelAtMsecSinceEpoch; - std::string Key; - ContextValue Value; - -diff -rupN orig/sdk/core/azure-core/src/http/policy.cpp patched/sdk/core/azure-core/src/http/policy.cpp ---- orig/sdk/core/azure-core/src/http/policy.cpp 2021-02-03 09:10:44.454678199 +0000 -+++ patched/sdk/core/azure-core/src/http/policy.cpp 2021-02-03 09:11:15.535238932 +0000 -@@ -10,10 +10,10 @@ using namespace Azure::Core::Http; - #ifndef _MSC_VER - // Non-MSVC compilers do require allocation of statics, even if they are const constexpr. - // MSVC, on the other hand, has problem if you "redefine" static constexprs. --Azure::Core::Logging::LogClassification const Azure::Core::Http::LogClassification::Request; --Azure::Core::Logging::LogClassification const Azure::Core::Http::LogClassification::Response; --Azure::Core::Logging::LogClassification const Azure::Core::Http::LogClassification::Retry; --Azure::Core::Logging::LogClassification const -+constexpr Azure::Core::Logging::LogClassification const Azure::Core::Http::LogClassification::Request; -+constexpr Azure::Core::Logging::LogClassification const Azure::Core::Http::LogClassification::Response; -+constexpr Azure::Core::Logging::LogClassification const Azure::Core::Http::LogClassification::Retry; -+constexpr Azure::Core::Logging::LogClassification const - Azure::Core::Http::LogClassification::HttpTransportAdapter; - #endif - -diff -rupN orig/sdk/keyvault/azure-security-keyvault-keys/src/key_client.cpp patched/sdk/keyvault/azure-security-keyvault-keys/src/key_client.cpp ---- orig/sdk/keyvault/azure-security-keyvault-keys/src/key_client.cpp 2021-02-10 10:35:03.305252930 +0100 -+++ patched/sdk/keyvault/azure-security-keyvault-keys/src/key_client.cpp 2021-02-10 16:11:25.139169400 +0100 -@@ -16,7 +16,7 @@ using namespace Azure::Core::Http; - - KeyClient::KeyClient( - std::string const& vaultUrl, -- std::shared_ptr<Core::TokenCredential const> credential, -+ std::shared_ptr<Azure::Core::TokenCredential const> credential, - KeyClientOptions options) - { - auto apiVersion = options.GetVersionString(); diff --git a/thirdparty/azure-sdk-cpp-for-cpp/fix-illegal-qualified-name-in-member.patch b/thirdparty/azure-sdk-cpp-for-cpp/fix-illegal-qualified-name-in-member.patch deleted file mode 100644 index 938da90..0000000 --- a/thirdparty/azure-sdk-cpp-for-cpp/fix-illegal-qualified-name-in-member.patch +++ /dev/null @@ -1,14 +0,0 @@ -diff -rupN orig/sdk/core/azure-core/inc/azure/core/http/winhttp/win_http_client.hpp patched/sdk/core/azure-core/inc/azure/core/http/winhttp/win_http_client.hpp ---- orig/sdk/core/azure-core/inc/azure/core/http/winhttp/win_http_client.hpp 2021-06-07 16:51:23.179818286 +0200 -+++ patched/sdk/core/azure-core/inc/azure/core/http/winhttp/win_http_client.hpp 2021-06-07 16:51:55.536150585 +0200 -@@ -132,8 +132,8 @@ namespace Azure { namespace Core { names - void CreateRequestHandle(std::unique_ptr<Details::HandleManager>& handleManager); - void Upload(std::unique_ptr<Details::HandleManager>& handleManager); - void SendRequest(std::unique_ptr<Details::HandleManager>& handleManager); -- void WinHttpTransport::ReceiveResponse(std::unique_ptr<Details::HandleManager>& handleManager); -- int64_t WinHttpTransport::GetContentLength( -+ void ReceiveResponse(std::unique_ptr<Details::HandleManager>& handleManager); -+ int64_t GetContentLength( - std::unique_ptr<Details::HandleManager>& handleManager, - HttpMethod requestMethod, - HttpStatusCode responseStatusCode); diff --git a/thirdparty/azure-sdk-cpp/azure-sdk-cpp-remove-samples.patch b/thirdparty/azure-sdk-cpp/azure-sdk-cpp-remove-samples.patch new file mode 100644 index 0000000..c9e5ea6 --- /dev/null +++ b/thirdparty/azure-sdk-cpp/azure-sdk-cpp-remove-samples.patch @@ -0,0 +1,23 @@ +# Samples require OpenSSL library on host that should not be required +diff --git a/sdk/identity/azure-identity/CMakeLists.txt b/sdk/identity/azure-identity/CMakeLists.txt +index 5a099b0e..ba8920dc 100644 +--- a/sdk/identity/azure-identity/CMakeLists.txt ++++ b/sdk/identity/azure-identity/CMakeLists.txt +@@ -92,6 +92,3 @@ if (BUILD_PERFORMANCE_TESTS) + add_subdirectory(test/perf) + endif() + +-if (AZ_ALL_LIBRARIES) +- add_subdirectory(samples) +-endif() + +diff --git a/CMakeLists.txt b/CMakeLists.txt +index 173bca57..e5e4e9a0 100644 +--- a/CMakeLists.txt ++++ b/CMakeLists.txt +@@ -95,4 +95,3 @@ add_subdirectory(sdk/identity) + add_subdirectory(sdk/keyvault) + add_subdirectory(sdk/storage) + add_subdirectory(sdk/template) +-add_subdirectory(samples/integration/vcpkg-keyvault) +
