This is an automated email from the ASF dual-hosted git repository. fgerlits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit a9a0ade72d2c278d457f2b952fb67d7d4eaa981a Author: Gabor Gyimesi <[email protected]> AuthorDate: Thu Aug 19 11:35:13 2021 +0200 MINIFICPP-1630 Create FetchAzureDataLakeStorage processor Signed-off-by: Ferenc Gerlits <[email protected]> This closes #1221 --- PROCESSORS.md | 28 +++++ README.md | 2 +- extensions/aws/processors/FetchS3Object.cpp | 2 +- extensions/aws/processors/FetchS3Object.h | 11 +- .../azure/processors/FetchAzureDataLakeStorage.cpp | 126 +++++++++++++++++++ .../azure/processors/FetchAzureDataLakeStorage.h | 100 ++++++++++++++++ extensions/azure/storage/AzureDataLakeStorage.cpp | 13 +- extensions/azure/storage/AzureDataLakeStorage.h | 2 + .../azure/storage/AzureDataLakeStorageClient.cpp | 39 +++++- .../azure/storage/AzureDataLakeStorageClient.h | 29 ++++- extensions/azure/storage/DataLakeStorageClient.h | 9 ++ .../azure-tests/AzureDataLakeStorageTestsFixture.h | 33 +++-- .../azure-tests/FetchAzureDataLakeStorageTests.cpp | 133 +++++++++++++++++++++ .../test/azure-tests/MockDataLakeStorageClient.h | 37 ++++++ 14 files changed, 537 insertions(+), 27 deletions(-) diff --git a/PROCESSORS.md b/PROCESSORS.md index 5b393a0..6a2dc8e 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -21,6 +21,7 @@ - [ExecuteSQL](#executesql) - [ExecuteScript](#executescript) - [ExtractText](#extracttext) +- [FetchAzureDataLakeStorage](#fetchazuredatalakestorage) - [FetchOPCProcessor](#fetchopcprocessor) - [FetchS3Object](#fetchs3object) - [FetchSFTP](#fetchsftp) @@ -504,6 +505,33 @@ In the list below, the names of required properties appear in bold. Any other pr |success|success operational on the flow record| +## FetchAzureDataLakeStorage + +### Description + +Fetch the provided file from Azure Data Lake Storage Gen 2 +### Properties + +In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. + +| Name | Default Value | Allowable Values | Description | +| - | - | - | - | +|**Azure Storage Credentials Service**|||Name of the Azure Storage Credentials Service used to retrieve the connection string from.| +|File Name|||The filename in Azure Storage. If left empty the filename attribute will be used by default.<br/>**Supports Expression Language: true**| +|**Filesystem Name**|||Name of the Azure Storage File System. It is assumed to be already existing.<br/>**Supports Expression Language: true**| +|Directory Name|||Name of the Azure Storage Directory. The Directory Name cannot contain a leading '/'. If left empty it designates the root directory. The directory will be created if not already existing.<br/>**Supports Expression Language: true**| +|Range Start|||The byte position at which to start reading from the object. An empty value or a value of zero will start reading at the beginning of the object.<br/>**Supports Expression Language: true**| +|Range Length|||The number of bytes to download from the object, starting from the Range Start. An empty value or a value that extends beyond the end of the object will read to the end of the object.<br/>**Supports Expression Language: true**| +|Number of Retries|0||The number of automatic retries to perform if the download fails.<br/>**Supports Expression Language: true**| + +### Relationships + +| Name | Description | +| - | - | +|failure|In case of fetch failure flowfiles are transferred to this relationship| +|success|Files that have been successfully fetched from Azure storage are transferred to this relationship| + + ## FetchOPCProcessor ### Description diff --git a/README.md b/README.md index 934478b..7dac13e 100644 --- a/README.md +++ b/README.md @@ -77,7 +77,7 @@ Through JNI extensions you can run NiFi processors using NARs. The JNI extension | ------------- |:-------------| :-----| | Archive Extensions | [ApplyTemplate](PROCESSORS.md#applytemplate)<br/>[CompressContent](PROCESSORS.md#compresscontent)<br/>[ManipulateArchive](PROCESSORS.md#manipulatearchive)<br/>[MergeContent](PROCESSORS.md#mergecontent)<br/>[FocusArchiveEntry](PROCESSORS.md#focusarchiveentry)<br/>[UnfocusArchiveEntry](PROCESSORS.md#unfocusarchiveentry) | -DBUILD_LIBARCHIVE=ON | | AWS | [AWSCredentialsService](CONTROLLERS.md#awscredentialsservice)<br/>[PutS3Object](PROCESSORS.md#puts3object)<br/>[DeleteS3Object](PROCESSORS.md#deletes3object)<br/>[FetchS3Object](PROCESSORS.md#fetchs3object)<br/>[ListS3](PROCESSORS.md#lists3) | -DENABLE_AWS=ON | -| Azure | [AzureStorageCredentialsService](CONTROLLERS.md#azurestoragecredentialsservice)<br/>[PutAzureBlobStorage](PROCESSORS.md#putazureblobatorage)<br/>[PutAzureDataLakeStorage](#putazuredatalakestorage)<br/>[DeleteAzureDataLakeStorage](#deleteazuredatalakestorage) | -DENABLE_AZURE=ON | +| Azure | [AzureStorageCredentialsService](CONTROLLERS.md#azurestoragecredentialsservice)<br/>[PutAzureBlobStorage](PROCESSORS.md#putazureblobatorage)<br/>[PutAzureDataLakeStorage](#putazuredatalakestorage)<br/>[DeleteAzureDataLakeStorage](#deleteazuredatalakestorage)<br/>[FetchAzureDataLakeStorage](#fetchazuredatalakestorage) | -DENABLE_AZURE=ON | | CivetWeb | [ListenHTTP](PROCESSORS.md#listenhttp) | -DDISABLE_CIVET=ON | | CURL | [InvokeHTTP](PROCESSORS.md#invokehttp) | -DDISABLE_CURL=ON | | GPS | GetGPS | -DENABLE_GPS=ON | diff --git a/extensions/aws/processors/FetchS3Object.cpp b/extensions/aws/processors/FetchS3Object.cpp index c7de905..d0099b4 100644 --- a/extensions/aws/processors/FetchS3Object.cpp +++ b/extensions/aws/processors/FetchS3Object.cpp @@ -109,7 +109,7 @@ void FetchS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &conte return; } - WriteCallback callback(flow_file->getSize(), *get_object_params, s3_wrapper_); + WriteCallback callback(*get_object_params, s3_wrapper_); session->write(flow_file, &callback); if (callback.result_) { diff --git a/extensions/aws/processors/FetchS3Object.h b/extensions/aws/processors/FetchS3Object.h index 7e0d79c..23f2dca 100644 --- a/extensions/aws/processors/FetchS3Object.h +++ b/extensions/aws/processors/FetchS3Object.h @@ -65,14 +65,12 @@ class FetchS3Object : public S3Processor { class WriteCallback : public OutputStreamCallback { public: - WriteCallback(uint64_t flow_size, const minifi::aws::s3::GetObjectRequestParameters& get_object_params, aws::s3::S3Wrapper& s3_wrapper) - : flow_size_(flow_size) - , get_object_params_(get_object_params) - , s3_wrapper_(s3_wrapper) { + WriteCallback(const minifi::aws::s3::GetObjectRequestParameters& get_object_params, aws::s3::S3Wrapper& s3_wrapper) + : get_object_params_(get_object_params), + s3_wrapper_(s3_wrapper) { } int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { - std::vector<uint8_t> buffer; result_ = s3_wrapper_.getObject(get_object_params_, *stream); if (!result_) { return 0; @@ -81,11 +79,8 @@ class FetchS3Object : public S3Processor { return result_->write_size; } - uint64_t flow_size_; - const minifi::aws::s3::GetObjectRequestParameters& get_object_params_; aws::s3::S3Wrapper& s3_wrapper_; - uint64_t write_size_ = 0; std::optional<minifi::aws::s3::GetObjectResult> result_; }; diff --git a/extensions/azure/processors/FetchAzureDataLakeStorage.cpp b/extensions/azure/processors/FetchAzureDataLakeStorage.cpp new file mode 100644 index 0000000..4885190 --- /dev/null +++ b/extensions/azure/processors/FetchAzureDataLakeStorage.cpp @@ -0,0 +1,126 @@ +/** + * @file FetchAzureDataLakeStorage.cpp + * FetchAzureDataLakeStorage class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "FetchAzureDataLakeStorage.h" + +#include "core/Resource.h" + +namespace org::apache::nifi::minifi::azure::processors { + +const core::Property FetchAzureDataLakeStorage::RangeStart( + core::PropertyBuilder::createProperty("Range Start") + ->withDescription("The byte position at which to start reading from the object. An empty value or a value of zero will start reading at the beginning of the object.") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property FetchAzureDataLakeStorage::RangeLength( + core::PropertyBuilder::createProperty("Range Length") + ->withDescription("The number of bytes to download from the object, starting from the Range Start. " + "An empty value or a value that extends beyond the end of the object will read to the end of the object.") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property FetchAzureDataLakeStorage::NumberOfRetries( + core::PropertyBuilder::createProperty("Number of Retries") + ->withDescription("The number of automatic retries to perform if the download fails.") + ->withDefaultValue<uint64_t>(0) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Relationship FetchAzureDataLakeStorage::Success("success", "Files that have been successfully fetched from Azure storage are transferred to this relationship"); +const core::Relationship FetchAzureDataLakeStorage::Failure("failure", "In case of fetch failure flowfiles are transferred to this relationship"); + +void FetchAzureDataLakeStorage::initialize() { + // Add new supported properties + setSupportedProperties({ + AzureStorageCredentialsService, + FilesystemName, + DirectoryName, + FileName, + RangeStart, + RangeLength, + NumberOfRetries + }); + // Set the supported relationships + setSupportedRelationships({ + Success, + Failure + }); +} + +std::optional<storage::FetchAzureDataLakeStorageParameters> FetchAzureDataLakeStorage::buildFetchParameters( + core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) { + storage::FetchAzureDataLakeStorageParameters params; + if (!setCommonParameters(params, context, flow_file)) { + return std::nullopt; + } + + std::string value; + if (context.getProperty(RangeStart, value, flow_file)) { + params.range_start = std::stoull(value); + logger_->log_debug("Range Start property set to %llu", *params.range_start); + } + + if (context.getProperty(RangeLength, value, flow_file)) { + params.range_length = std::stoull(value); + logger_->log_debug("Range Length property set to %llu", *params.range_length); + } + + if (context.getProperty(NumberOfRetries, value, flow_file)) { + params.number_of_retries = std::stoull(value); + logger_->log_debug("Number Of Retries property set to %llu", *params.number_of_retries); + } + + return params; +} + +void FetchAzureDataLakeStorage::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) { + gsl_Expects(context && session); + logger_->log_debug("FetchAzureDataLakeStorage onTrigger"); + std::shared_ptr<core::FlowFile> flow_file = session->get(); + if (!flow_file) { + context->yield(); + return; + } + + const auto params = buildFetchParameters(*context, flow_file); + if (!params) { + session->transfer(flow_file, Failure); + return; + } + + auto fetched_flow_file = session->create(flow_file); + WriteCallback callback(azure_data_lake_storage_, *params, logger_); + session->write(fetched_flow_file, &callback); + + if (callback.getResult() == std::nullopt) { + logger_->log_error("Failed to fetch file '%s' from Azure Data Lake storage", params->filename); + session->transfer(flow_file, Failure); + session->remove(fetched_flow_file); + } else { + logger_->log_debug("Successfully fetched file '%s' from filesystem '%s' on Azure Data Lake storage", params->filename, params->file_system_name); + session->transfer(fetched_flow_file, Success); + session->remove(flow_file); + } +} + +REGISTER_RESOURCE(FetchAzureDataLakeStorage, "Fetch the provided file from Azure Data Lake Storage Gen 2"); + +} // namespace org::apache::nifi::minifi::azure::processors diff --git a/extensions/azure/processors/FetchAzureDataLakeStorage.h b/extensions/azure/processors/FetchAzureDataLakeStorage.h new file mode 100644 index 0000000..4768e49 --- /dev/null +++ b/extensions/azure/processors/FetchAzureDataLakeStorage.h @@ -0,0 +1,100 @@ +/** + * @file FetchAzureDataLakeStorage.h + * FetchAzureDataLakeStorage class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <string> +#include <utility> +#include <memory> + +#include "AzureDataLakeStorageProcessorBase.h" + +template<typename AzureDataLakeStorageProcessor> +class AzureDataLakeStorageTestsFixture; + +namespace org::apache::nifi::minifi::azure::processors { + +class FetchAzureDataLakeStorage final : public AzureDataLakeStorageProcessorBase { + public: + // Supported Properties + EXTENSIONAPI static const core::Property RangeStart; + EXTENSIONAPI static const core::Property RangeLength; + EXTENSIONAPI static const core::Property NumberOfRetries; + + // Supported Relationships + static const core::Relationship Failure; + static const core::Relationship Success; + + explicit FetchAzureDataLakeStorage(const std::string& name, const minifi::utils::Identifier& uuid = minifi::utils::Identifier()) + : AzureDataLakeStorageProcessorBase(name, uuid, core::logging::LoggerFactory<FetchAzureDataLakeStorage>::getLogger()) { + } + + ~FetchAzureDataLakeStorage() override = default; + + void initialize() override; + void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override; + + private: + friend class ::AzureDataLakeStorageTestsFixture<FetchAzureDataLakeStorage>; + + class WriteCallback : public OutputStreamCallback { + public: + WriteCallback(storage::AzureDataLakeStorage& azure_data_lake_storage, const storage::FetchAzureDataLakeStorageParameters& params, std::shared_ptr<core::logging::Logger> logger) + : azure_data_lake_storage_(azure_data_lake_storage), + params_(params), + logger_(std::move(logger)) { + } + + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { + result_size_ = azure_data_lake_storage_.fetchFile(params_, *stream); + if (!result_size_) { + return 0; + } + + return gsl::narrow<int64_t>(*result_size_); + } + + auto getResult() const { + return result_size_; + } + + private: + storage::AzureDataLakeStorage& azure_data_lake_storage_; + const storage::FetchAzureDataLakeStorageParameters& params_; + std::optional<uint64_t> result_size_ = std::nullopt; + std::shared_ptr<core::logging::Logger> logger_; + }; + + core::annotation::Input getInputRequirement() const override { + return core::annotation::Input::INPUT_REQUIRED; + } + + bool isSingleThreaded() const override { + return true; + } + + explicit FetchAzureDataLakeStorage(const std::string& name, const minifi::utils::Identifier& uuid, std::unique_ptr<storage::DataLakeStorageClient> data_lake_storage_client) + : AzureDataLakeStorageProcessorBase(name, uuid, core::logging::LoggerFactory<FetchAzureDataLakeStorage>::getLogger(), std::move(data_lake_storage_client)) { + } + + std::optional<storage::FetchAzureDataLakeStorageParameters> buildFetchParameters(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file); +}; + +} // namespace org::apache::nifi::minifi::azure::processors diff --git a/extensions/azure/storage/AzureDataLakeStorage.cpp b/extensions/azure/storage/AzureDataLakeStorage.cpp index 2be5292..34e2d3b 100644 --- a/extensions/azure/storage/AzureDataLakeStorage.cpp +++ b/extensions/azure/storage/AzureDataLakeStorage.cpp @@ -21,6 +21,7 @@ #include "AzureDataLakeStorage.h" #include "AzureDataLakeStorageClient.h" +#include "io/StreamPipe.h" namespace org::apache::nifi::minifi::azure::storage { @@ -52,7 +53,7 @@ UploadDataLakeStorageResult AzureDataLakeStorage::uploadFile(const PutAzureDataL } } -bool AzureDataLakeStorage::deleteFile(const storage::DeleteAzureDataLakeStorageParameters& params) { +bool AzureDataLakeStorage::deleteFile(const DeleteAzureDataLakeStorageParameters& params) { try { return data_lake_storage_client_->deleteFile(params); } catch (const std::exception& ex) { @@ -61,4 +62,14 @@ bool AzureDataLakeStorage::deleteFile(const storage::DeleteAzureDataLakeStorageP } } +std::optional<uint64_t> AzureDataLakeStorage::fetchFile(const FetchAzureDataLakeStorageParameters& params, io::BaseStream& stream) { + try { + auto result = data_lake_storage_client_->fetchFile(params); + return internal::pipe(result.get(), &stream); + } catch (const std::exception& ex) { + logger_->log_error("An exception occurred while fetching '%s/%s' of filesystem '%s': %s", params.directory_name, params.filename, params.file_system_name, ex.what()); + return std::nullopt; + } +} + } // namespace org::apache::nifi::minifi::azure::storage diff --git a/extensions/azure/storage/AzureDataLakeStorage.h b/extensions/azure/storage/AzureDataLakeStorage.h index 8dd9b8a..d1291a2 100644 --- a/extensions/azure/storage/AzureDataLakeStorage.h +++ b/extensions/azure/storage/AzureDataLakeStorage.h @@ -28,6 +28,7 @@ #include "core/logging/Logger.h" #include "core/logging/LoggerConfiguration.h" #include "DataLakeStorageClient.h" +#include "azure/core/io/body_stream.hpp" namespace org::apache::nifi::minifi::azure::storage { @@ -48,6 +49,7 @@ class AzureDataLakeStorage { storage::UploadDataLakeStorageResult uploadFile(const storage::PutAzureDataLakeStorageParameters& params, gsl::span<const uint8_t> buffer); bool deleteFile(const storage::DeleteAzureDataLakeStorageParameters& params); + std::optional<uint64_t> fetchFile(const FetchAzureDataLakeStorageParameters& params, io::BaseStream& stream); private: std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<AzureDataLakeStorage>::getLogger()}; diff --git a/extensions/azure/storage/AzureDataLakeStorageClient.cpp b/extensions/azure/storage/AzureDataLakeStorageClient.cpp index 7a54a69..e542432 100644 --- a/extensions/azure/storage/AzureDataLakeStorageClient.cpp +++ b/extensions/azure/storage/AzureDataLakeStorageClient.cpp @@ -18,7 +18,11 @@ * limitations under the License. */ +#include <utility> + #include "AzureDataLakeStorageClient.h" +#include "azure/core/http/http.hpp" +#include "azure/storage/files/datalake/datalake_options.hpp" #include "azure/identity.hpp" @@ -30,30 +34,35 @@ AzureDataLakeStorageClient::AzureDataLakeStorageClient() { utils::AzureSdkLogger::initialize(); } -void AzureDataLakeStorageClient::resetClientIfNeeded(const AzureStorageCredentials& credentials, const std::string& file_system_name) { - if (client_ && credentials_ == credentials && file_system_name_ == file_system_name) { +void AzureDataLakeStorageClient::resetClientIfNeeded(const AzureStorageCredentials& credentials, const std::string& file_system_name, std::optional<uint64_t> number_of_retries) { + if (client_ && credentials_ == credentials && file_system_name_ == file_system_name && number_of_retries_ == number_of_retries) { logger_->log_debug("Azure Data Lake Storge client credentials have not changed, no need to reset client"); return; } + Azure::Storage::Files::DataLake::DataLakeClientOptions options; + if (number_of_retries) { + options.Retry.MaxRetries = *number_of_retries; + } + if (credentials.getUseManagedIdentityCredentials()) { auto datalake_service_client = Azure::Storage::Files::DataLake::DataLakeServiceClient( - "https://" + credentials.getStorageAccountName() + ".dfs." + credentials.getEndpointSuffix(), std::make_shared<Azure::Identity::ManagedIdentityCredential>()); - + "https://" + credentials.getStorageAccountName() + ".dfs." + credentials.getEndpointSuffix(), std::make_shared<Azure::Identity::ManagedIdentityCredential>(), options); client_ = std::make_unique<Azure::Storage::Files::DataLake::DataLakeFileSystemClient>(datalake_service_client.GetFileSystemClient(file_system_name)); logger_->log_debug("Azure Data Lake Storge client has been reset with new managed identity credentials."); } else { client_ = std::make_unique<Azure::Storage::Files::DataLake::DataLakeFileSystemClient>( - Azure::Storage::Files::DataLake::DataLakeFileSystemClient::CreateFromConnectionString(credentials.buildConnectionString(), file_system_name)); + Azure::Storage::Files::DataLake::DataLakeFileSystemClient::CreateFromConnectionString(credentials.buildConnectionString(), file_system_name, options)); logger_->log_debug("Azure Data Lake Storge client has been reset with new connection string credentials."); } file_system_name_ = file_system_name; credentials_ = credentials; + number_of_retries_ = number_of_retries; } Azure::Storage::Files::DataLake::DataLakeFileClient AzureDataLakeStorageClient::getFileClient(const AzureDataLakeStorageParameters& params) { - resetClientIfNeeded(params.credentials, params.file_system_name); + resetClientIfNeeded(params.credentials, params.file_system_name, params.number_of_retries); auto directory_client = client_->GetDirectoryClient(params.directory_name); if (!params.directory_name.empty()) { @@ -80,4 +89,22 @@ bool AzureDataLakeStorageClient::deleteFile(const DeleteAzureDataLakeStoragePara return result.Value.Deleted; } +std::unique_ptr<io::InputStream> AzureDataLakeStorageClient::fetchFile(const FetchAzureDataLakeStorageParameters& params) { + auto file_client = getFileClient(params); + Azure::Storage::Files::DataLake::DownloadFileOptions options; + if (params.range_start || params.range_length) { + Azure::Core::Http::HttpRange range; + if (params.range_start) { + range.Offset = *params.range_start; + } + + if (params.range_length) { + range.Length = *params.range_length; + } + options.Range = range; + } + auto result = file_client.Download(options); + return std::make_unique<AzureDataLakeStorageInputStream>(std::move(result.Value)); +} + } // namespace org::apache::nifi::minifi::azure::storage diff --git a/extensions/azure/storage/AzureDataLakeStorageClient.h b/extensions/azure/storage/AzureDataLakeStorageClient.h index e7d933d..cd8d791 100644 --- a/extensions/azure/storage/AzureDataLakeStorageClient.h +++ b/extensions/azure/storage/AzureDataLakeStorageClient.h @@ -21,6 +21,7 @@ #include <string> #include <memory> +#include <utility> #include <azure/storage/files/datalake.hpp> @@ -56,12 +57,38 @@ class AzureDataLakeStorageClient : public DataLakeStorageClient { */ bool deleteFile(const DeleteAzureDataLakeStorageParameters& params) override; + /** + * Fetches a file from the Azure Data Lake Storage + * @param params Parameters required for connecting and file access on Azure + * @return Download result of Azure Data Lake storage client + */ + std::unique_ptr<io::InputStream> fetchFile(const FetchAzureDataLakeStorageParameters& params) override; + private: - void resetClientIfNeeded(const AzureStorageCredentials& credentials, const std::string& file_system_name); + class AzureDataLakeStorageInputStream : public io::InputStream { + public: + explicit AzureDataLakeStorageInputStream(Azure::Storage::Files::DataLake::Models::DownloadFileResult&& result) + : result_(std::move(result)) { + } + + size_t size() const override { + return result_.Body->Length(); + } + + size_t read(uint8_t *value, size_t len) override { + return result_.Body->Read(value, len); + } + + private: + Azure::Storage::Files::DataLake::Models::DownloadFileResult result_; + }; + + void resetClientIfNeeded(const AzureStorageCredentials& credentials, const std::string& file_system_name, std::optional<uint64_t> number_of_retries); Azure::Storage::Files::DataLake::DataLakeFileClient getFileClient(const AzureDataLakeStorageParameters& params); AzureStorageCredentials credentials_; std::string file_system_name_; + std::optional<uint64_t> number_of_retries_; std::unique_ptr<Azure::Storage::Files::DataLake::DataLakeFileSystemClient> client_; std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<AzureDataLakeStorageClient>::getLogger()}; }; diff --git a/extensions/azure/storage/DataLakeStorageClient.h b/extensions/azure/storage/DataLakeStorageClient.h index 4e97d72..eb57d62 100644 --- a/extensions/azure/storage/DataLakeStorageClient.h +++ b/extensions/azure/storage/DataLakeStorageClient.h @@ -21,10 +21,12 @@ #include <string> #include <optional> +#include <memory> #include "AzureStorageCredentials.h" #include "utils/gsl.h" +#include "io/InputStream.h" namespace org::apache::nifi::minifi::azure::storage { @@ -33,6 +35,7 @@ struct AzureDataLakeStorageParameters { std::string file_system_name; std::string directory_name; std::string filename; + std::optional<uint64_t> number_of_retries; }; struct PutAzureDataLakeStorageParameters : public AzureDataLakeStorageParameters { @@ -41,11 +44,17 @@ struct PutAzureDataLakeStorageParameters : public AzureDataLakeStorageParameters using DeleteAzureDataLakeStorageParameters = AzureDataLakeStorageParameters; +struct FetchAzureDataLakeStorageParameters : public AzureDataLakeStorageParameters { + std::optional<uint64_t> range_start; + std::optional<uint64_t> range_length; +}; + class DataLakeStorageClient { public: virtual bool createFile(const PutAzureDataLakeStorageParameters& params) = 0; virtual std::string uploadFile(const PutAzureDataLakeStorageParameters& params, gsl::span<const uint8_t> buffer) = 0; virtual bool deleteFile(const DeleteAzureDataLakeStorageParameters& params) = 0; + virtual std::unique_ptr<io::InputStream> fetchFile(const FetchAzureDataLakeStorageParameters& params) = 0; virtual ~DataLakeStorageClient() = default; }; diff --git a/libminifi/test/azure-tests/AzureDataLakeStorageTestsFixture.h b/libminifi/test/azure-tests/AzureDataLakeStorageTestsFixture.h index 6878e71..1ef56b3 100644 --- a/libminifi/test/azure-tests/AzureDataLakeStorageTestsFixture.h +++ b/libminifi/test/azure-tests/AzureDataLakeStorageTestsFixture.h @@ -71,19 +71,32 @@ class AzureDataLakeStorageTestsFixture { update_attribute_ = plan_->addProcessor("UpdateAttribute", "UpdateAttribute", { {"success", "d"} }, true); plan_->addProcessor(azure_data_lake_storage_, "AzureDataLakeStorageProcessor", { {"success", "d"}, {"failure", "d"} }, true); auto logattribute = plan_->addProcessor("LogAttribute", "LogAttribute", { {"success", "d"} }, true); - logattribute->setAutoTerminatedRelationships({{"success", "d"}}); - putfile_ = plan_->addProcessor("PutFile", "PutFile", { {"success", "d"} }, false); - plan_->addConnection(azure_data_lake_storage_, {"failure", "d"}, putfile_); - putfile_->setAutoTerminatedRelationships({{"success", "d"}, {"failure", "d"}}); - output_dir_ = test_controller_.createTempDirectory(); - plan_->setProperty(putfile_, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), output_dir_); + success_putfile_ = plan_->addProcessor("PutFile", "SuccessPutFile", { {"success", "d"} }, false); + plan_->addConnection(logattribute, {"success", "d"}, success_putfile_); + success_putfile_->setAutoTerminatedRelationships({{"success", "d"}, {"failure", "d"}}); + success_output_dir_ = test_controller_.createTempDirectory(); + plan_->setProperty(success_putfile_, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), success_output_dir_); + + failure_putfile_ = plan_->addProcessor("PutFile", "FailurePutFile", { {"success", "d"} }, false); + plan_->addConnection(azure_data_lake_storage_, {"failure", "d"}, failure_putfile_); + failure_putfile_->setAutoTerminatedRelationships({{"success", "d"}, {"failure", "d"}}); + failure_output_dir_ = test_controller_.createTempDirectory(); + plan_->setProperty(failure_putfile_, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), failure_output_dir_); azure_storage_cred_service_ = plan_->addController("AzureStorageCredentialsService", "AzureStorageCredentialsService"); setDefaultProperties(); } std::vector<std::string> getFailedFlowFileContents() { + return getFileContents(failure_output_dir_); + } + + std::vector<std::string> getSuccessfulFlowFileContents() { + return getFileContents(success_output_dir_); + } + + std::vector<std::string> getFileContents(const std::string& dir) { std::vector<std::string> file_contents; auto lambda = [&file_contents](const std::string& path, const std::string& filename) -> bool { @@ -92,7 +105,7 @@ class AzureDataLakeStorageTestsFixture { return true; }; - utils::file::FileUtils::list_dir(output_dir_, lambda, plan_->getLogger(), false); + utils::file::FileUtils::list_dir(dir, lambda, plan_->getLogger(), false); return file_contents; } @@ -116,7 +129,9 @@ class AzureDataLakeStorageTestsFixture { std::shared_ptr<core::Processor> azure_data_lake_storage_; std::shared_ptr<core::Processor> get_file_; std::shared_ptr<core::Processor> update_attribute_; - std::shared_ptr<core::Processor> putfile_; + std::shared_ptr<core::Processor> success_putfile_; + std::shared_ptr<core::Processor> failure_putfile_; std::shared_ptr<core::controller::ControllerServiceNode> azure_storage_cred_service_; - std::string output_dir_; + std::string failure_output_dir_; + std::string success_output_dir_; }; diff --git a/libminifi/test/azure-tests/FetchAzureDataLakeStorageTests.cpp b/libminifi/test/azure-tests/FetchAzureDataLakeStorageTests.cpp new file mode 100644 index 0000000..a055962 --- /dev/null +++ b/libminifi/test/azure-tests/FetchAzureDataLakeStorageTests.cpp @@ -0,0 +1,133 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "AzureDataLakeStorageTestsFixture.h" +#include "processors/FetchAzureDataLakeStorage.h" +#include "controllerservices/AzureStorageCredentialsService.h" + +namespace { + +using namespace std::chrono_literals; + +using FetchAzureDataLakeStorageTestsFixture = AzureDataLakeStorageTestsFixture<minifi::azure::processors::FetchAzureDataLakeStorage>; + +TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Azure storage credentials service is empty", "[azureDataLakeStorageParameters]") { + plan_->setProperty(azure_data_lake_storage_, minifi::azure::processors::FetchAzureDataLakeStorage::AzureStorageCredentialsService.getName(), ""); + REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception); + REQUIRE(getFailedFlowFileContents().size() == 0); +} + +TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Test Azure credentials with account name and SAS token set", "[azureDataLakeStorageParameters]") { + setDefaultProperties(); + plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::SASToken.getName(), "token"); + plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName.getName(), "TEST_ACCOUNT"); + plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), ""); + test_controller_.runSession(plan_, true); + auto passed_params = mock_data_lake_storage_client_ptr_->getPassedFetchParams(); + CHECK(passed_params.credentials.buildConnectionString() == "AccountName=TEST_ACCOUNT;SharedAccessSignature=token"); + CHECK(getFailedFlowFileContents().size() == 0); +} + +TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Test Azure credentials with connection string override", "[azureDataLakeStorageParameters]") { + setDefaultProperties(); + plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), CONNECTION_STRING); + plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::SASToken.getName(), "token"); + plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName.getName(), "TEST_ACCOUNT"); + test_controller_.runSession(plan_, true); + auto passed_params = mock_data_lake_storage_client_ptr_->getPassedFetchParams(); + CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING); + CHECK(getFailedFlowFileContents().size() == 0); +} + +TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Test Azure credentials with managed identity use", "[azureDataLakeStorageParameters]") { + setDefaultProperties(); + plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), "test"); + plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::UseManagedIdentityCredentials.getName(), "true"); + plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName.getName(), "TEST_ACCOUNT"); + test_controller_.runSession(plan_, true); + auto passed_params = mock_data_lake_storage_client_ptr_->getPassedFetchParams(); + CHECK(passed_params.credentials.buildConnectionString().empty()); + CHECK(passed_params.credentials.getStorageAccountName() == "TEST_ACCOUNT"); + CHECK(passed_params.credentials.getEndpointSuffix() == "core.windows.net"); + CHECK(getFailedFlowFileContents().size() == 0); +} + +TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Filesystem name is not set", "[azureDataLakeStorageParameters]") { + plan_->setProperty(update_attribute_, "test.filesystemname", "", true); + test_controller_.runSession(plan_, true); + using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime; + CHECK(verifyLogLinePresenceInPollTime(1s, "Filesystem Name '' is invalid or empty!")); + auto failed_flowfiles = getFailedFlowFileContents(); + REQUIRE(failed_flowfiles.size() == 1); + REQUIRE(failed_flowfiles[0] == TEST_DATA); +} + +TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Connection String is empty", "[azureDataLakeStorageParameters]") { + plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), ""); + REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception); + REQUIRE(getFailedFlowFileContents().size() == 0); +} + +TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Fetch full file succeeds", "[azureDataLakeStorageFetch]") { + test_controller_.runSession(plan_, true); + REQUIRE(getFailedFlowFileContents().size() == 0); + auto passed_params = mock_data_lake_storage_client_ptr_->getPassedFetchParams(); + CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING); + CHECK(passed_params.file_system_name == FILESYSTEM_NAME); + CHECK(passed_params.directory_name == DIRECTORY_NAME); + CHECK(passed_params.filename == GETFILE_FILE_NAME); + CHECK(passed_params.range_start == std::nullopt); + CHECK(passed_params.range_length == std::nullopt); + auto success_contents = getSuccessfulFlowFileContents(); + REQUIRE(success_contents.size() == 1); + REQUIRE(success_contents[0] == mock_data_lake_storage_client_ptr_->FETCHED_DATA); +} + +TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Fetch a range of the file succeeds", "[azureDataLakeStorageFetch]") { + plan_->setProperty(azure_data_lake_storage_, minifi::azure::processors::FetchAzureDataLakeStorage::RangeStart.getName(), "5"); + plan_->setProperty(azure_data_lake_storage_, minifi::azure::processors::FetchAzureDataLakeStorage::RangeLength.getName(), "10"); + test_controller_.runSession(plan_, true); + REQUIRE(getFailedFlowFileContents().size() == 0); + auto passed_params = mock_data_lake_storage_client_ptr_->getPassedFetchParams(); + CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING); + CHECK(passed_params.file_system_name == FILESYSTEM_NAME); + CHECK(passed_params.directory_name == DIRECTORY_NAME); + CHECK(passed_params.filename == GETFILE_FILE_NAME); + CHECK(*passed_params.range_start == 5); + CHECK(*passed_params.range_length == 10); + auto success_contents = getSuccessfulFlowFileContents(); + REQUIRE(success_contents.size() == 1); + REQUIRE(success_contents[0] == mock_data_lake_storage_client_ptr_->FETCHED_DATA.substr(5, 10)); +} + +TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Number of Retries is set", "[azureDataLakeStorageFetch]") { + plan_->setProperty(azure_data_lake_storage_, minifi::azure::processors::FetchAzureDataLakeStorage::NumberOfRetries.getName(), "1"); + test_controller_.runSession(plan_, true); + REQUIRE(mock_data_lake_storage_client_ptr_->getPassedFetchParams().number_of_retries == 1); +} + +TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Fetch full file fails", "[azureDataLakeStorageFetch]") { + mock_data_lake_storage_client_ptr_->setFetchFailure(true); + test_controller_.runSession(plan_, true); + REQUIRE(getSuccessfulFlowFileContents().size() == 0); + auto failed_contents = getFailedFlowFileContents(); + REQUIRE(failed_contents.size() == 1); + REQUIRE(failed_contents[0] == TEST_DATA); +} + +} // namespace diff --git a/libminifi/test/azure-tests/MockDataLakeStorageClient.h b/libminifi/test/azure-tests/MockDataLakeStorageClient.h index eeac4dc..8969e1e 100644 --- a/libminifi/test/azure-tests/MockDataLakeStorageClient.h +++ b/libminifi/test/azure-tests/MockDataLakeStorageClient.h @@ -20,12 +20,17 @@ #include <string> #include <stdexcept> +#include <memory> +#include <utility> +#include <vector> #include "storage/DataLakeStorageClient.h" +#include "io/BufferStream.h" class MockDataLakeStorageClient : public org::apache::nifi::minifi::azure::storage::DataLakeStorageClient { public: const std::string PRIMARY_URI = "http://test-uri/file"; + const std::string FETCHED_DATA = "test azure data for stream"; bool createFile(const org::apache::nifi::minifi::azure::storage::PutAzureDataLakeStorageParameters& /*params*/) override { if (file_creation_error_) { @@ -55,6 +60,27 @@ class MockDataLakeStorageClient : public org::apache::nifi::minifi::azure::stora return delete_result_; } + std::unique_ptr<org::apache::nifi::minifi::io::InputStream> fetchFile(const org::apache::nifi::minifi::azure::storage::FetchAzureDataLakeStorageParameters& params) override { + if (fetch_fails_) { + throw std::runtime_error("error"); + } + + fetch_params_ = params; + buffer_.clear(); + uint64_t range_start = 0; + uint64_t size = FETCHED_DATA.size(); + if (params.range_start) { + range_start = *params.range_start; + } + + if (params.range_length) { + size = *params.range_length; + } + + buffer_.assign(FETCHED_DATA.begin() + range_start, FETCHED_DATA.begin() + range_start + size); + return std::make_unique<org::apache::nifi::minifi::io::BufferStream>(buffer_.data(), buffer_.size()); + } + void setFileCreation(bool create_file) { create_file_ = create_file; } @@ -75,6 +101,10 @@ class MockDataLakeStorageClient : public org::apache::nifi::minifi::azure::stora delete_result_ = delete_result; } + void setFetchFailure(bool fetch_fails) { + fetch_fails_ = fetch_fails; + } + org::apache::nifi::minifi::azure::storage::PutAzureDataLakeStorageParameters getPassedPutParams() const { return put_params_; } @@ -83,6 +113,10 @@ class MockDataLakeStorageClient : public org::apache::nifi::minifi::azure::stora return delete_params_; } + org::apache::nifi::minifi::azure::storage::FetchAzureDataLakeStorageParameters getPassedFetchParams() const { + return fetch_params_; + } + private: const std::string RETURNED_PRIMARY_URI = "http://test-uri/file?secret-sas"; bool create_file_ = true; @@ -90,7 +124,10 @@ class MockDataLakeStorageClient : public org::apache::nifi::minifi::azure::stora bool upload_fails_ = false; bool delete_fails_ = false; bool delete_result_ = true; + bool fetch_fails_ = false; std::string input_data_; + std::vector<uint8_t> buffer_; org::apache::nifi::minifi::azure::storage::PutAzureDataLakeStorageParameters put_params_; org::apache::nifi::minifi::azure::storage::DeleteAzureDataLakeStorageParameters delete_params_; + org::apache::nifi::minifi::azure::storage::FetchAzureDataLakeStorageParameters fetch_params_; };
