This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 6074634dcdb18f86d599aa9cfaee4f09e05752c8 Author: Gabor Gyimesi <[email protected]> AuthorDate: Wed Dec 1 14:07:22 2021 +0100 MINIFICPP-1629 Add DeleteAzureDataLakeStorage processor Closes #1195 Signed-off-by: Marton Szasz <[email protected]> --- PROCESSORS.md | 29 +++- README.md | 2 +- .../AzureDataLakeStorageProcessorBase.cpp | 81 ++++++++++ .../processors/AzureDataLakeStorageProcessorBase.h | 64 ++++++++ .../azure/processors/AzureStorageProcessorBase.cpp | 6 +- .../azure/processors/AzureStorageProcessorBase.h | 2 +- .../processors/DeleteAzureDataLakeStorage.cpp | 85 +++++++++++ .../azure/processors/DeleteAzureDataLakeStorage.h | 67 ++++++++ .../azure/processors/PutAzureBlobStorage.cpp | 2 +- .../azure/processors/PutAzureDataLakeStorage.cpp | 55 ++----- .../azure/processors/PutAzureDataLakeStorage.h | 28 ++-- extensions/azure/storage/AzureDataLakeStorage.cpp | 9 ++ extensions/azure/storage/AzureDataLakeStorage.h | 1 + .../azure/storage/AzureDataLakeStorageClient.cpp | 8 +- .../azure/storage/AzureDataLakeStorageClient.h | 9 +- extensions/azure/storage/DataLakeStorageClient.h | 8 +- .../tests/unit/GenerateFlowFileTests.cpp | 9 +- .../azure-tests/AzureDataLakeStorageTestsFixture.h | 122 +++++++++++++++ .../DeleteAzureDataLakeStorageTests.cpp | 128 ++++++++++++++++ .../test/azure-tests/MockDataLakeStorageClient.h | 96 ++++++++++++ .../test/azure-tests/PutAzureBlobStorageTests.cpp | 3 +- .../azure-tests/PutAzureDataLakeStorageTests.cpp | 168 ++------------------- 22 files changed, 747 insertions(+), 235 deletions(-) diff --git a/PROCESSORS.md b/PROCESSORS.md index 2a1bcbb..de7eadf 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -14,6 +14,7 @@ - [ConsumeKafka](#consumekafka) - [ConsumeMQTT](#consumemqtt) - [DefragmentText](#defragmenttext) +- [DeleteAzureDataLakeStorage](#deleteazuredatalakestorage) - [DeleteS3Object](#deletes3object) - [ExecuteProcess](#executeprocess) - [ExecutePythonProcessor](#executepythonprocessor) @@ -328,6 +329,30 @@ In the list below, the names of required properties appear in bold. Any other pr |failure|Flowfiles that failed the defragmentation process| +## DeleteAzureDataLakeStorage + +### Description + +Deletes the provided file from Azure Data Lake Storage Gen 2 +### Properties + +In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. + +| Name | Default Value | Allowable Values | Description | +| - | - | - | - | +|**Azure Storage Credentials Service**|||Name of the Azure Storage Credentials Service used to retrieve the connection string from.| +|File Name|||The filename in Azure Storage. If left empty the filename attribute will be used by default.<br/>**Supports Expression Language: true**| +|**Filesystem Name**|||Name of the Azure Storage File System. It is assumed to be already existing.<br/>**Supports Expression Language: true**| +|Directory Name|||Name of the Azure Storage Directory. The Directory Name cannot contain a leading '/'. If left empty it designates the root directory. The directory will be created if not already existing.<br/>**Supports Expression Language: true**| + +### Relationships + +| Name | Description | +| - | - | +|failure|If file deletion from Azure Data Lake storage fails the flowfile is transferred to this relationship| +|success|If file deletion from Azure Data Lake storage succeeds the flowfile is transferred to this relationship| + + ## DeleteS3Object ### Description @@ -1287,10 +1312,10 @@ In the list below, the names of required properties appear in bold. Any other pr | Name | Default Value | Allowable Values | Description | | - | - | - | - | |**Azure Storage Credentials Service**|||Name of the Azure Storage Credentials Service used to retrieve the connection string from.| +|**Conflict Resolution Strategy**|fail|fail<br/>replace<br/>ignore|Indicates what should happen when a file with the same name already exists in the output directory.| +|File Name|||The filename in Azure Storage. If left empty the filename attribute will be used by default.<br/>**Supports Expression Language: true**| |**Filesystem Name**|||Name of the Azure Storage File System. It is assumed to be already existing.<br/>**Supports Expression Language: true**| |Directory Name|||Name of the Azure Storage Directory. The Directory Name cannot contain a leading '/'. If left empty it designates the root directory. The directory will be created if not already existing.<br/>**Supports Expression Language: true**| -|File Name|||The filename to be uploaded. If left empty the filename attribute will be used by default.<br/>**Supports Expression Language: true**| -|**Conflict Resolution Strategy**|fail|fail<br/>replace<br/>ignore|Indicates what should happen when a file with the same name already exists in the output directory.| ### Relationships diff --git a/README.md b/README.md index 3aa08be..0bd2328 100644 --- a/README.md +++ b/README.md @@ -77,7 +77,7 @@ Through JNI extensions you can run NiFi processors using NARs. The JNI extension | ------------- |:-------------| :-----| | Archive Extensions | [ApplyTemplate](PROCESSORS.md#applytemplate)<br/>[CompressContent](PROCESSORS.md#compresscontent)<br/>[ManipulateArchive](PROCESSORS.md#manipulatearchive)<br/>[MergeContent](PROCESSORS.md#mergecontent)<br/>[FocusArchiveEntry](PROCESSORS.md#focusarchiveentry)<br/>[UnfocusArchiveEntry](PROCESSORS.md#unfocusarchiveentry) | -DBUILD_LIBARCHIVE=ON | | AWS | [AWSCredentialsService](CONTROLLERS.md#awscredentialsservice)<br/>[PutS3Object](PROCESSORS.md#puts3object)<br/>[DeleteS3Object](PROCESSORS.md#deletes3object)<br/>[FetchS3Object](PROCESSORS.md#fetchs3object)<br/>[ListS3](PROCESSORS.md#lists3) | -DENABLE_AWS=ON | -| Azure | [AzureStorageCredentialsService](CONTROLLERS.md#azurestoragecredentialsservice)<br/>[PutAzureBlobStorage](PROCESSORS.md#putazureblobatorage)<br/>[PutAzureDataLakeStorage](#putazuredatalakestorage) | -DENABLE_AZURE=ON | +| Azure | [AzureStorageCredentialsService](CONTROLLERS.md#azurestoragecredentialsservice)<br/>[PutAzureBlobStorage](PROCESSORS.md#putazureblobatorage)<br/>[PutAzureDataLakeStorage](#putazuredatalakestorage)<br/>[DeleteAzureDataLakeStorage](#deleteazuredatalakestorage) | -DENABLE_AZURE=ON | | CivetWeb | [ListenHTTP](PROCESSORS.md#listenhttp) | -DDISABLE_CIVET=ON | | CURL | [InvokeHTTP](PROCESSORS.md#invokehttp) | -DDISABLE_CURL=ON | | GPS | GetGPS | -DENABLE_GPS=ON | diff --git a/extensions/azure/processors/AzureDataLakeStorageProcessorBase.cpp b/extensions/azure/processors/AzureDataLakeStorageProcessorBase.cpp new file mode 100644 index 0000000..04d7664 --- /dev/null +++ b/extensions/azure/processors/AzureDataLakeStorageProcessorBase.cpp @@ -0,0 +1,81 @@ +/** + * @file AzureDataLakeStorageProcessorBase.cpp + * AzureDataLakeStorageProcessorBase class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "AzureDataLakeStorageProcessorBase.h" + +#include "utils/ProcessorConfigUtils.h" +#include "controllerservices/AzureStorageCredentialsService.h" + +namespace org::apache::nifi::minifi::azure::processors { + +const core::Property AzureDataLakeStorageProcessorBase::FilesystemName( + core::PropertyBuilder::createProperty("Filesystem Name") + ->withDescription("Name of the Azure Storage File System. It is assumed to be already existing.") + ->supportsExpressionLanguage(true) + ->isRequired(true) + ->build()); +const core::Property AzureDataLakeStorageProcessorBase::DirectoryName( + core::PropertyBuilder::createProperty("Directory Name") + ->withDescription("Name of the Azure Storage Directory. The Directory Name cannot contain a leading '/'. " + "If left empty it designates the root directory. The directory will be created if not already existing.") + ->supportsExpressionLanguage(true) + ->build()); +const core::Property AzureDataLakeStorageProcessorBase::FileName( + core::PropertyBuilder::createProperty("File Name") + ->withDescription("The filename in Azure Storage. If left empty the filename attribute will be used by default.") + ->supportsExpressionLanguage(true) + ->build()); + +void AzureDataLakeStorageProcessorBase::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) { + gsl_Expects(context); + std::optional<storage::AzureStorageCredentials> credentials; + std::tie(std::ignore, credentials) = getCredentialsFromControllerService(*context); + if (!credentials) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Azure Storage Credentials Service property missing or invalid"); + } + + if (!credentials->isValid()) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Credentials set in the Azure Storage credentials service are invalid"); + } + + credentials_ = *credentials; +} + +bool AzureDataLakeStorageProcessorBase::setCommonParameters( + storage::AzureDataLakeStorageParameters& params, core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) { + params.credentials = credentials_; + + if (!context.getProperty(FilesystemName, params.file_system_name, flow_file) || params.file_system_name.empty()) { + logger_->log_error("Filesystem Name '%s' is invalid or empty!", params.file_system_name); + return false; + } + + context.getProperty(DirectoryName, params.directory_name, flow_file); + + context.getProperty(FileName, params.filename, flow_file); + if (params.filename.empty() && (!flow_file->getAttribute("filename", params.filename) || params.filename.empty())) { + logger_->log_error("No File Name is set and default object key 'filename' attribute could not be found!"); + return false; + } + + return true; +} + +} // namespace org::apache::nifi::minifi::azure::processors diff --git a/extensions/azure/processors/AzureDataLakeStorageProcessorBase.h b/extensions/azure/processors/AzureDataLakeStorageProcessorBase.h new file mode 100644 index 0000000..14f2a92 --- /dev/null +++ b/extensions/azure/processors/AzureDataLakeStorageProcessorBase.h @@ -0,0 +1,64 @@ +/** + * @file AzureDataLakeStorageProcessorBase.h + * AzureDataLakeStorageProcessorBase class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <utility> +#include <string> +#include <memory> +#include <optional> + +#include "core/Property.h" +#include "core/logging/Logger.h" +#include "core/logging/LoggerConfiguration.h" +#include "storage/AzureDataLakeStorage.h" +#include "AzureStorageProcessorBase.h" + +namespace org::apache::nifi::minifi::azure::processors { + +class AzureDataLakeStorageProcessorBase : public AzureStorageProcessorBase { + public: + // Supported Properties + EXTENSIONAPI static const core::Property FilesystemName; + EXTENSIONAPI static const core::Property DirectoryName; + EXTENSIONAPI static const core::Property FileName; + + explicit AzureDataLakeStorageProcessorBase(const std::string& name, const minifi::utils::Identifier& uuid, const std::shared_ptr<core::logging::Logger> &logger) + : AzureStorageProcessorBase(name, uuid, logger) { + } + + ~AzureDataLakeStorageProcessorBase() override = default; + + void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override; + + protected: + explicit AzureDataLakeStorageProcessorBase(const std::string& name, const minifi::utils::Identifier& uuid, const std::shared_ptr<core::logging::Logger> &logger, + std::unique_ptr<storage::DataLakeStorageClient> data_lake_storage_client) + : AzureStorageProcessorBase(name, uuid, logger), + azure_data_lake_storage_(std::move(data_lake_storage_client)) { + } + + bool setCommonParameters(storage::AzureDataLakeStorageParameters& params, core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file); + + storage::AzureStorageCredentials credentials_; + storage::AzureDataLakeStorage azure_data_lake_storage_; +}; + +} // namespace org::apache::nifi::minifi::azure::processors diff --git a/extensions/azure/processors/AzureStorageProcessorBase.cpp b/extensions/azure/processors/AzureStorageProcessorBase.cpp index 63b230e..a5a2426 100644 --- a/extensions/azure/processors/AzureStorageProcessorBase.cpp +++ b/extensions/azure/processors/AzureStorageProcessorBase.cpp @@ -33,13 +33,13 @@ const core::Property AzureStorageProcessorBase::AzureStorageCredentialsService( ->build()); std::tuple<AzureStorageProcessorBase::GetCredentialsFromControllerResult, std::optional<storage::AzureStorageCredentials>> AzureStorageProcessorBase::getCredentialsFromControllerService( - const std::shared_ptr<core::ProcessContext> &context) const { + core::ProcessContext &context) const { std::string service_name; - if (!context->getProperty(AzureStorageCredentialsService.getName(), service_name) || service_name.empty()) { + if (!context.getProperty(AzureStorageCredentialsService.getName(), service_name) || service_name.empty()) { return std::make_tuple(GetCredentialsFromControllerResult::CONTROLLER_NAME_EMPTY, std::nullopt); } - std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(service_name); + std::shared_ptr<core::controller::ControllerService> service = context.getControllerService(service_name); if (nullptr == service) { logger_->log_error("Azure Storage credentials service with name: '%s' could not be found", service_name); return std::make_tuple(GetCredentialsFromControllerResult::CONTROLLER_NAME_INVALID, std::nullopt); diff --git a/extensions/azure/processors/AzureStorageProcessorBase.h b/extensions/azure/processors/AzureStorageProcessorBase.h index f87e827..a85ae7b 100644 --- a/extensions/azure/processors/AzureStorageProcessorBase.h +++ b/extensions/azure/processors/AzureStorageProcessorBase.h @@ -49,7 +49,7 @@ class AzureStorageProcessorBase : public core::Processor { CONTROLLER_NAME_INVALID }; - std::tuple<GetCredentialsFromControllerResult, std::optional<storage::AzureStorageCredentials>> getCredentialsFromControllerService(const std::shared_ptr<core::ProcessContext> &context) const; + std::tuple<GetCredentialsFromControllerResult, std::optional<storage::AzureStorageCredentials>> getCredentialsFromControllerService(core::ProcessContext &context) const; std::shared_ptr<core::logging::Logger> logger_; }; diff --git a/extensions/azure/processors/DeleteAzureDataLakeStorage.cpp b/extensions/azure/processors/DeleteAzureDataLakeStorage.cpp new file mode 100644 index 0000000..6146a31 --- /dev/null +++ b/extensions/azure/processors/DeleteAzureDataLakeStorage.cpp @@ -0,0 +1,85 @@ +/** + * @file DeleteAzureDataLakeStorage.cpp + * DeleteAzureDataLakeStorage class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "DeleteAzureDataLakeStorage.h" + +#include "utils/ProcessorConfigUtils.h" +#include "utils/gsl.h" +#include "core/Resource.h" + +namespace org::apache::nifi::minifi::azure::processors { + +const core::Relationship DeleteAzureDataLakeStorage::Success("success", "If file deletion from Azure storage succeeds the flowfile is transferred to this relationship"); +const core::Relationship DeleteAzureDataLakeStorage::Failure("failure", "If file deletion from Azure storage fails the flowfile is transferred to this relationship"); + +void DeleteAzureDataLakeStorage::initialize() { + // Set the supported properties + setSupportedProperties({ + AzureStorageCredentialsService, + FilesystemName, + DirectoryName, + FileName + }); + + // Set the supported relationships + setSupportedRelationships({ + Success, + Failure + }); +} + +std::optional<storage::DeleteAzureDataLakeStorageParameters> DeleteAzureDataLakeStorage::buildDeleteParameters( + core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) { + storage::DeleteAzureDataLakeStorageParameters params; + if (!setCommonParameters(params, context, flow_file)) { + return std::nullopt; + } + + return params; +} + +void DeleteAzureDataLakeStorage::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) { + gsl_Expects(context && session); + logger_->log_trace("DeleteAzureDataLakeStorage onTrigger"); + std::shared_ptr<core::FlowFile> flow_file = session->get(); + if (!flow_file) { + context->yield(); + return; + } + + const auto params = buildDeleteParameters(*context, flow_file); + if (!params) { + session->transfer(flow_file, Failure); + return; + } + + auto result = azure_data_lake_storage_.deleteFile(*params); + if (!result) { + logger_->log_error("Failed to delete file '%s' to Azure Data Lake storage", params->filename); + session->transfer(flow_file, Failure); + } else { + logger_->log_debug("Successfully deleted file '%s' of filesystem '%s' on Azure Data Lake storage", params->filename, params->file_system_name); + session->transfer(flow_file, Success); + } +} + +REGISTER_RESOURCE(DeleteAzureDataLakeStorage, "Deletes the provided file from Azure Data Lake Storage"); + +} // namespace org::apache::nifi::minifi::azure::processors diff --git a/extensions/azure/processors/DeleteAzureDataLakeStorage.h b/extensions/azure/processors/DeleteAzureDataLakeStorage.h new file mode 100644 index 0000000..cb26a2f --- /dev/null +++ b/extensions/azure/processors/DeleteAzureDataLakeStorage.h @@ -0,0 +1,67 @@ +/** + * @file DeleteAzureDataLakeStorage.h + * DeleteAzureDataLakeStorage class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <string> +#include <utility> +#include <memory> + +#include "AzureDataLakeStorageProcessorBase.h" + +template<typename AzureDataLakeStorageProcessorBase> +class AzureDataLakeStorageTestsFixture; + +namespace org::apache::nifi::minifi::azure::processors { + +class DeleteAzureDataLakeStorage final : public AzureDataLakeStorageProcessorBase { + public: + // Supported Relationships + static const core::Relationship Failure; + static const core::Relationship Success; + + explicit DeleteAzureDataLakeStorage(const std::string& name, const minifi::utils::Identifier& uuid = minifi::utils::Identifier()) + : AzureDataLakeStorageProcessorBase(name, uuid, core::logging::LoggerFactory<DeleteAzureDataLakeStorage>::getLogger()) { + } + + ~DeleteAzureDataLakeStorage() override = default; + + void initialize() override; + void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override; + + private: + friend class ::AzureDataLakeStorageTestsFixture<DeleteAzureDataLakeStorage>; + + core::annotation::Input getInputRequirement() const override { + return core::annotation::Input::INPUT_REQUIRED; + } + + bool isSingleThreaded() const override { + return true; + } + + explicit DeleteAzureDataLakeStorage(const std::string& name, const minifi::utils::Identifier& uuid, std::unique_ptr<storage::DataLakeStorageClient> data_lake_storage_client) + : AzureDataLakeStorageProcessorBase(name, uuid, core::logging::LoggerFactory<DeleteAzureDataLakeStorage>::getLogger(), std::move(data_lake_storage_client)) { + } + + std::optional<storage::DeleteAzureDataLakeStorageParameters> buildDeleteParameters(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file); +}; + +} // namespace org::apache::nifi::minifi::azure::processors diff --git a/extensions/azure/processors/PutAzureBlobStorage.cpp b/extensions/azure/processors/PutAzureBlobStorage.cpp index e5f66fd..7913ca8 100644 --- a/extensions/azure/processors/PutAzureBlobStorage.cpp +++ b/extensions/azure/processors/PutAzureBlobStorage.cpp @@ -201,7 +201,7 @@ std::optional<storage::PutAzureBlobStorageParameters> PutAzureBlobStorage::build std::optional<storage::AzureStorageCredentials> PutAzureBlobStorage::getCredentials( const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::FlowFile> &flow_file) const { - auto [result, controller_service_creds] = getCredentialsFromControllerService(context); + auto [result, controller_service_creds] = getCredentialsFromControllerService(*context); if (controller_service_creds) { if (controller_service_creds->isValid()) { logger_->log_debug("Azure credentials read from credentials controller service!"); diff --git a/extensions/azure/processors/PutAzureDataLakeStorage.cpp b/extensions/azure/processors/PutAzureDataLakeStorage.cpp index 5620737..d197d2f 100644 --- a/extensions/azure/processors/PutAzureDataLakeStorage.cpp +++ b/extensions/azure/processors/PutAzureDataLakeStorage.cpp @@ -20,30 +20,14 @@ #include "PutAzureDataLakeStorage.h" +#include <vector> + #include "utils/ProcessorConfigUtils.h" #include "utils/gsl.h" -#include "controllerservices/AzureStorageCredentialsService.h" #include "core/Resource.h" namespace org::apache::nifi::minifi::azure::processors { -const core::Property PutAzureDataLakeStorage::FilesystemName( - core::PropertyBuilder::createProperty("Filesystem Name") - ->withDescription("Name of the Azure Storage File System. It is assumed to be already existing.") - ->supportsExpressionLanguage(true) - ->isRequired(true) - ->build()); -const core::Property PutAzureDataLakeStorage::DirectoryName( - core::PropertyBuilder::createProperty("Directory Name") - ->withDescription("Name of the Azure Storage Directory. The Directory Name cannot contain a leading '/'. " - "If left empty it designates the root directory. The directory will be created if not already existing.") - ->supportsExpressionLanguage(true) - ->build()); -const core::Property PutAzureDataLakeStorage::FileName( - core::PropertyBuilder::createProperty("File Name") - ->withDescription("The filename to be uploaded. If left empty the filename attribute will be used by default.") - ->supportsExpressionLanguage(true) - ->build()); const core::Property PutAzureDataLakeStorage::ConflictResolutionStrategy( core::PropertyBuilder::createProperty("Conflict Resolution Strategy") ->withDescription("Indicates what should happen when a file with the same name already exists in the output directory.") @@ -71,46 +55,27 @@ void PutAzureDataLakeStorage::initialize() { }); } -void PutAzureDataLakeStorage::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) { - std::optional<storage::AzureStorageCredentials> credentials; - std::tie(std::ignore, credentials) = getCredentialsFromControllerService(context); - if (!credentials) { - throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Azure Storage Credentials Service property missing or invalid"); - } - - if (!credentials->isValid()) { - throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Azure Storage Credentials Service properties are not set or invalid"); - } - - credentials_ = *credentials; +void PutAzureDataLakeStorage::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) { + gsl_Expects(context && sessionFactory); + AzureDataLakeStorageProcessorBase::onSchedule(context, sessionFactory); conflict_resolution_strategy_ = FileExistsResolutionStrategy::parse( utils::parsePropertyWithAllowableValuesOrThrow(*context, ConflictResolutionStrategy.getName(), FileExistsResolutionStrategy::values()).c_str()); } std::optional<storage::PutAzureDataLakeStorageParameters> PutAzureDataLakeStorage::buildUploadParameters( - const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) { + core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) { storage::PutAzureDataLakeStorageParameters params; - params.credentials = credentials_; - params.replace_file = conflict_resolution_strategy_ == FileExistsResolutionStrategy::REPLACE_FILE; - - if (!context->getProperty(FilesystemName, params.file_system_name, flow_file) || params.file_system_name.empty()) { - logger_->log_error("Filesystem Name '%s' is invalid or empty!", params.file_system_name); - return std::nullopt; - } - - context->getProperty(DirectoryName, params.directory_name, flow_file); - - context->getProperty(FileName, params.filename, flow_file); - if (params.filename.empty() && (!flow_file->getAttribute("filename", params.filename) || params.filename.empty())) { - logger_->log_error("No File Name is set and default object key 'filename' attribute could not be found!"); + if (!setCommonParameters(params, context, flow_file)) { return std::nullopt; } + params.replace_file = conflict_resolution_strategy_ == FileExistsResolutionStrategy::REPLACE_FILE; return params; } void PutAzureDataLakeStorage::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) { + gsl_Expects(context && session); logger_->log_trace("PutAzureDataLakeStorage onTrigger"); std::shared_ptr<core::FlowFile> flow_file = session->get(); if (!flow_file) { @@ -118,7 +83,7 @@ void PutAzureDataLakeStorage::onTrigger(const std::shared_ptr<core::ProcessConte return; } - const auto params = buildUploadParameters(context, flow_file); + const auto params = buildUploadParameters(*context, flow_file); if (!params) { session->transfer(flow_file, Failure); return; diff --git a/extensions/azure/processors/PutAzureDataLakeStorage.h b/extensions/azure/processors/PutAzureDataLakeStorage.h index 67ae6c9..a268711 100644 --- a/extensions/azure/processors/PutAzureDataLakeStorage.h +++ b/extensions/azure/processors/PutAzureDataLakeStorage.h @@ -20,30 +20,23 @@ #pragma once -#include <utility> #include <string> +#include <utility> #include <memory> -#include <optional> -#include <vector> -#include "core/Property.h" -#include "core/logging/Logger.h" -#include "core/logging/LoggerConfiguration.h" -#include "storage/AzureDataLakeStorage.h" +#include "AzureDataLakeStorageProcessorBase.h" + #include "utils/Enum.h" #include "utils/Export.h" -#include "AzureStorageProcessorBase.h" -class PutAzureDataLakeStorageTestsFixture; +template<typename AzureDataLakeStorageProcessor> +class AzureDataLakeStorageTestsFixture; namespace org::apache::nifi::minifi::azure::processors { -class PutAzureDataLakeStorage final : public AzureStorageProcessorBase { +class PutAzureDataLakeStorage final : public AzureDataLakeStorageProcessorBase { public: // Supported Properties - EXTENSIONAPI static const core::Property FilesystemName; - EXTENSIONAPI static const core::Property DirectoryName; - EXTENSIONAPI static const core::Property FileName; EXTENSIONAPI static const core::Property ConflictResolutionStrategy; // Supported Relationships @@ -65,7 +58,7 @@ class PutAzureDataLakeStorage final : public AzureStorageProcessorBase { void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override; private: - friend class ::PutAzureDataLakeStorageTestsFixture; + friend class ::AzureDataLakeStorageTestsFixture<PutAzureDataLakeStorage>; class ReadCallback : public InputStreamCallback { public: @@ -93,15 +86,12 @@ class PutAzureDataLakeStorage final : public AzureStorageProcessorBase { } explicit PutAzureDataLakeStorage(const std::string& name, const minifi::utils::Identifier& uuid, std::unique_ptr<storage::DataLakeStorageClient> data_lake_storage_client) - : AzureStorageProcessorBase(name, uuid, core::logging::LoggerFactory<PutAzureDataLakeStorage>::getLogger()), - azure_data_lake_storage_(std::move(data_lake_storage_client)) { + : AzureDataLakeStorageProcessorBase(name, uuid, core::logging::LoggerFactory<PutAzureDataLakeStorage>::getLogger(), std::move(data_lake_storage_client)) { } - std::optional<storage::PutAzureDataLakeStorageParameters> buildUploadParameters(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file); + std::optional<storage::PutAzureDataLakeStorageParameters> buildUploadParameters(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file); - storage::AzureStorageCredentials credentials_; FileExistsResolutionStrategy conflict_resolution_strategy_; - storage::AzureDataLakeStorage azure_data_lake_storage_; }; } // namespace org::apache::nifi::minifi::azure::processors diff --git a/extensions/azure/storage/AzureDataLakeStorage.cpp b/extensions/azure/storage/AzureDataLakeStorage.cpp index 5562933..2be5292 100644 --- a/extensions/azure/storage/AzureDataLakeStorage.cpp +++ b/extensions/azure/storage/AzureDataLakeStorage.cpp @@ -52,4 +52,13 @@ UploadDataLakeStorageResult AzureDataLakeStorage::uploadFile(const PutAzureDataL } } +bool AzureDataLakeStorage::deleteFile(const storage::DeleteAzureDataLakeStorageParameters& params) { + try { + return data_lake_storage_client_->deleteFile(params); + } catch (const std::exception& ex) { + logger_->log_error("An exception occurred while deleting '%s/%s' of filesystem '%s': %s", params.directory_name, params.filename, params.file_system_name, ex.what()); + return false; + } +} + } // namespace org::apache::nifi::minifi::azure::storage diff --git a/extensions/azure/storage/AzureDataLakeStorage.h b/extensions/azure/storage/AzureDataLakeStorage.h index a1ec8f6..8dd9b8a 100644 --- a/extensions/azure/storage/AzureDataLakeStorage.h +++ b/extensions/azure/storage/AzureDataLakeStorage.h @@ -47,6 +47,7 @@ class AzureDataLakeStorage { explicit AzureDataLakeStorage(std::unique_ptr<DataLakeStorageClient> data_lake_storage_client = nullptr); storage::UploadDataLakeStorageResult uploadFile(const storage::PutAzureDataLakeStorageParameters& params, gsl::span<const uint8_t> buffer); + bool deleteFile(const storage::DeleteAzureDataLakeStorageParameters& params); private: std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<AzureDataLakeStorage>::getLogger()}; diff --git a/extensions/azure/storage/AzureDataLakeStorageClient.cpp b/extensions/azure/storage/AzureDataLakeStorageClient.cpp index e56b967..7a54a69 100644 --- a/extensions/azure/storage/AzureDataLakeStorageClient.cpp +++ b/extensions/azure/storage/AzureDataLakeStorageClient.cpp @@ -52,7 +52,7 @@ void AzureDataLakeStorageClient::resetClientIfNeeded(const AzureStorageCredentia credentials_ = credentials; } -Azure::Storage::Files::DataLake::DataLakeFileClient AzureDataLakeStorageClient::getFileClient(const PutAzureDataLakeStorageParameters& params) { +Azure::Storage::Files::DataLake::DataLakeFileClient AzureDataLakeStorageClient::getFileClient(const AzureDataLakeStorageParameters& params) { resetClientIfNeeded(params.credentials, params.file_system_name); auto directory_client = client_->GetDirectoryClient(params.directory_name); @@ -74,4 +74,10 @@ std::string AzureDataLakeStorageClient::uploadFile(const PutAzureDataLakeStorage return file_client.GetUrl(); } +bool AzureDataLakeStorageClient::deleteFile(const DeleteAzureDataLakeStorageParameters& params) { + auto file_client = getFileClient(params); + auto result = file_client.Delete(); + return result.Value.Deleted; +} + } // namespace org::apache::nifi::minifi::azure::storage diff --git a/extensions/azure/storage/AzureDataLakeStorageClient.h b/extensions/azure/storage/AzureDataLakeStorageClient.h index 3f13fe6..e7d933d 100644 --- a/extensions/azure/storage/AzureDataLakeStorageClient.h +++ b/extensions/azure/storage/AzureDataLakeStorageClient.h @@ -49,9 +49,16 @@ class AzureDataLakeStorageClient : public DataLakeStorageClient { */ std::string uploadFile(const PutAzureDataLakeStorageParameters& params, gsl::span<const uint8_t> buffer) override; + /** + * Deletes a file on the Azure Data Lake Storage + * @param params Parameters required for connecting and file access on Azure + * @return True if file was deleted, false otherwise + */ + bool deleteFile(const DeleteAzureDataLakeStorageParameters& params) override; + private: void resetClientIfNeeded(const AzureStorageCredentials& credentials, const std::string& file_system_name); - Azure::Storage::Files::DataLake::DataLakeFileClient getFileClient(const PutAzureDataLakeStorageParameters& params); + Azure::Storage::Files::DataLake::DataLakeFileClient getFileClient(const AzureDataLakeStorageParameters& params); AzureStorageCredentials credentials_; std::string file_system_name_; diff --git a/extensions/azure/storage/DataLakeStorageClient.h b/extensions/azure/storage/DataLakeStorageClient.h index b0f470d..4e97d72 100644 --- a/extensions/azure/storage/DataLakeStorageClient.h +++ b/extensions/azure/storage/DataLakeStorageClient.h @@ -28,18 +28,24 @@ namespace org::apache::nifi::minifi::azure::storage { -struct PutAzureDataLakeStorageParameters { +struct AzureDataLakeStorageParameters { AzureStorageCredentials credentials; std::string file_system_name; std::string directory_name; std::string filename; +}; + +struct PutAzureDataLakeStorageParameters : public AzureDataLakeStorageParameters { bool replace_file = false; }; +using DeleteAzureDataLakeStorageParameters = AzureDataLakeStorageParameters; + class DataLakeStorageClient { public: virtual bool createFile(const PutAzureDataLakeStorageParameters& params) = 0; virtual std::string uploadFile(const PutAzureDataLakeStorageParameters& params, gsl::span<const uint8_t> buffer) = 0; + virtual bool deleteFile(const DeleteAzureDataLakeStorageParameters& params) = 0; virtual ~DataLakeStorageClient() = default; }; diff --git a/extensions/standard-processors/tests/unit/GenerateFlowFileTests.cpp b/extensions/standard-processors/tests/unit/GenerateFlowFileTests.cpp index 5dd99c9..16e2b47 100644 --- a/extensions/standard-processors/tests/unit/GenerateFlowFileTests.cpp +++ b/extensions/standard-processors/tests/unit/GenerateFlowFileTests.cpp @@ -58,8 +58,7 @@ TEST_CASE("GenerateFlowFileTest", "[generateflowfiletest]") { auto lambda = [&file_contents](const std::string& path, const std::string& filename) -> bool { std::ifstream is(path + utils::file::FileUtils::get_separator() + filename, std::ifstream::binary); - std::string file_content((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()); - file_contents.push_back(file_content); + file_contents.push_back(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>())); return true; }; @@ -188,8 +187,7 @@ TEST_CASE("GenerateFlowFileCustomTextTest", "[generateflowfiletest]") { auto lambda = [&file_contents](const std::string& path, const std::string& filename) -> bool { std::ifstream is(path + utils::file::FileUtils::get_separator() + filename, std::ifstream::binary); - std::string file_content((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()); - file_contents.push_back(file_content); + file_contents.push_back(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>())); return true; }; @@ -226,8 +224,7 @@ TEST_CASE("GenerateFlowFileCustomTextEmptyTest", "[generateflowfiletest]") { auto lambda = [&file_contents](const std::string& path, const std::string& filename) -> bool { std::ifstream is(path + utils::file::FileUtils::get_separator() + filename, std::ifstream::binary); - std::string file_content((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()); - file_contents.push_back(file_content); + file_contents.push_back(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>())); return true; }; diff --git a/libminifi/test/azure-tests/AzureDataLakeStorageTestsFixture.h b/libminifi/test/azure-tests/AzureDataLakeStorageTestsFixture.h new file mode 100644 index 0000000..6878e71 --- /dev/null +++ b/libminifi/test/azure-tests/AzureDataLakeStorageTestsFixture.h @@ -0,0 +1,122 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <utility> +#include <vector> +#include <memory> +#include <string> + +#include "MockDataLakeStorageClient.h" +#include "../TestBase.h" +#include "utils/TestUtils.h" +#include "utils/IntegrationTestUtils.h" +#include "core/Processor.h" +#include "processors/GetFile.h" +#include "processors/PutFile.h" +#include "processors/LogAttribute.h" +#include "processors/UpdateAttribute.h" +#include "utils/file/FileUtils.h" +#include "controllerservices/AzureStorageCredentialsService.h" + +const std::string FILESYSTEM_NAME = "testfilesystem"; +const std::string DIRECTORY_NAME = "testdir"; +const std::string FILE_NAME = "testfile.txt"; +const std::string CONNECTION_STRING = "test-connectionstring"; +const std::string TEST_DATA = "data123"; +const std::string GETFILE_FILE_NAME = "input_data.log"; + +template<typename AzureDataLakeStorageProcessor> +class AzureDataLakeStorageTestsFixture { + public: + AzureDataLakeStorageTestsFixture() { + LogTestController::getInstance().setDebug<TestPlan>(); + LogTestController::getInstance().setDebug<minifi::core::Processor>(); + LogTestController::getInstance().setTrace<minifi::core::ProcessSession>(); + LogTestController::getInstance().setTrace<minifi::processors::GetFile>(); + LogTestController::getInstance().setTrace<minifi::processors::PutFile>(); + LogTestController::getInstance().setDebug<minifi::processors::UpdateAttribute>(); + LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); + LogTestController::getInstance().setTrace<AzureDataLakeStorageProcessor>(); + + // Build MiNiFi processing graph + plan_ = test_controller_.createPlan(); + auto mock_data_lake_storage_client = std::make_unique<MockDataLakeStorageClient>(); + mock_data_lake_storage_client_ptr_ = mock_data_lake_storage_client.get(); + azure_data_lake_storage_ = std::shared_ptr<AzureDataLakeStorageProcessor>( + new AzureDataLakeStorageProcessor("AzureDataLakeStorageProcessor", utils::Identifier(), std::move(mock_data_lake_storage_client))); + auto input_dir = test_controller_.createTempDirectory(); + utils::putFileToDir(input_dir, GETFILE_FILE_NAME, TEST_DATA); + + get_file_ = plan_->addProcessor("GetFile", "GetFile"); + plan_->setProperty(get_file_, minifi::processors::GetFile::Directory.getName(), input_dir); + plan_->setProperty(get_file_, minifi::processors::GetFile::KeepSourceFile.getName(), "false"); + + update_attribute_ = plan_->addProcessor("UpdateAttribute", "UpdateAttribute", { {"success", "d"} }, true); + plan_->addProcessor(azure_data_lake_storage_, "AzureDataLakeStorageProcessor", { {"success", "d"}, {"failure", "d"} }, true); + auto logattribute = plan_->addProcessor("LogAttribute", "LogAttribute", { {"success", "d"} }, true); + logattribute->setAutoTerminatedRelationships({{"success", "d"}}); + + putfile_ = plan_->addProcessor("PutFile", "PutFile", { {"success", "d"} }, false); + plan_->addConnection(azure_data_lake_storage_, {"failure", "d"}, putfile_); + putfile_->setAutoTerminatedRelationships({{"success", "d"}, {"failure", "d"}}); + output_dir_ = test_controller_.createTempDirectory(); + plan_->setProperty(putfile_, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), output_dir_); + + azure_storage_cred_service_ = plan_->addController("AzureStorageCredentialsService", "AzureStorageCredentialsService"); + setDefaultProperties(); + } + + std::vector<std::string> getFailedFlowFileContents() { + std::vector<std::string> file_contents; + + auto lambda = [&file_contents](const std::string& path, const std::string& filename) -> bool { + std::ifstream is(path + utils::file::FileUtils::get_separator() + filename, std::ifstream::binary); + file_contents.push_back(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>())); + return true; + }; + + utils::file::FileUtils::list_dir(output_dir_, lambda, plan_->getLogger(), false); + return file_contents; + } + + void setDefaultProperties() { + plan_->setProperty(azure_data_lake_storage_, AzureDataLakeStorageProcessor::AzureStorageCredentialsService.getName(), "AzureStorageCredentialsService"); + plan_->setProperty(update_attribute_, "test.filesystemname", FILESYSTEM_NAME, true); + plan_->setProperty(azure_data_lake_storage_, AzureDataLakeStorageProcessor::FilesystemName.getName(), "${test.filesystemname}"); + plan_->setProperty(update_attribute_, "test.directoryname", DIRECTORY_NAME, true); + plan_->setProperty(azure_data_lake_storage_, AzureDataLakeStorageProcessor::DirectoryName.getName(), "${test.directoryname}"); + plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), CONNECTION_STRING); + } + + virtual ~AzureDataLakeStorageTestsFixture() { + LogTestController::getInstance().reset(); + } + + protected: + TestController test_controller_; + std::shared_ptr<TestPlan> plan_; + MockDataLakeStorageClient* mock_data_lake_storage_client_ptr_; + std::shared_ptr<core::Processor> azure_data_lake_storage_; + std::shared_ptr<core::Processor> get_file_; + std::shared_ptr<core::Processor> update_attribute_; + std::shared_ptr<core::Processor> putfile_; + std::shared_ptr<core::controller::ControllerServiceNode> azure_storage_cred_service_; + std::string output_dir_; +}; diff --git a/libminifi/test/azure-tests/DeleteAzureDataLakeStorageTests.cpp b/libminifi/test/azure-tests/DeleteAzureDataLakeStorageTests.cpp new file mode 100644 index 0000000..ea485f5 --- /dev/null +++ b/libminifi/test/azure-tests/DeleteAzureDataLakeStorageTests.cpp @@ -0,0 +1,128 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "AzureDataLakeStorageTestsFixture.h" +#include "processors/DeleteAzureDataLakeStorage.h" +#include "controllerservices/AzureStorageCredentialsService.h" + +namespace { + +using namespace std::chrono_literals; + +using DeleteAzureDataLakeStorageTestsFixture = AzureDataLakeStorageTestsFixture<minifi::azure::processors::DeleteAzureDataLakeStorage>; + +TEST_CASE_METHOD(DeleteAzureDataLakeStorageTestsFixture, "Azure storage credentials service is empty", "[azureDataLakeStorageParameters]") { + plan_->setProperty(azure_data_lake_storage_, minifi::azure::processors::DeleteAzureDataLakeStorage::AzureStorageCredentialsService.getName(), ""); + REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception); + REQUIRE(getFailedFlowFileContents().size() == 0); +} + +TEST_CASE_METHOD(DeleteAzureDataLakeStorageTestsFixture, "Test Azure credentials with account name and SAS token set", "[azureDataLakeStorageParameters]") { + setDefaultProperties(); + plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::SASToken.getName(), "token"); + plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName.getName(), "TEST_ACCOUNT"); + plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), ""); + test_controller_.runSession(plan_, true); + auto passed_params = mock_data_lake_storage_client_ptr_->getPassedDeleteParams(); + CHECK(passed_params.credentials.buildConnectionString() == "AccountName=TEST_ACCOUNT;SharedAccessSignature=token"); + CHECK(getFailedFlowFileContents().size() == 0); +} + +TEST_CASE_METHOD(DeleteAzureDataLakeStorageTestsFixture, "Test Azure credentials with connection string override", "[azureDataLakeStorageParameters]") { + setDefaultProperties(); + plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), CONNECTION_STRING); + plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::SASToken.getName(), "token"); + plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName.getName(), "TEST_ACCOUNT"); + test_controller_.runSession(plan_, true); + auto passed_params = mock_data_lake_storage_client_ptr_->getPassedDeleteParams(); + CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING); + CHECK(getFailedFlowFileContents().size() == 0); +} + +TEST_CASE_METHOD(DeleteAzureDataLakeStorageTestsFixture, "Test Azure credentials with managed identity use", "[azureDataLakeStorageParameters]") { + setDefaultProperties(); + plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), "test"); + plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::UseManagedIdentityCredentials.getName(), "true"); + plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName.getName(), "TEST_ACCOUNT"); + test_controller_.runSession(plan_, true); + auto passed_params = mock_data_lake_storage_client_ptr_->getPassedDeleteParams(); + CHECK(passed_params.credentials.buildConnectionString().empty()); + CHECK(passed_params.credentials.getStorageAccountName() == "TEST_ACCOUNT"); + CHECK(passed_params.credentials.getEndpointSuffix() == "core.windows.net"); + CHECK(getFailedFlowFileContents().size() == 0); +} + +TEST_CASE_METHOD(DeleteAzureDataLakeStorageTestsFixture, "Filesystem name is not set", "[azureDataLakeStorageParameters]") { + plan_->setProperty(update_attribute_, "test.filesystemname", "", true); + test_controller_.runSession(plan_, true); + using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime; + auto failed_flowfiles = getFailedFlowFileContents(); + CHECK(failed_flowfiles.size() == 1); + CHECK(failed_flowfiles[0] == TEST_DATA); + CHECK(verifyLogLinePresenceInPollTime(1s, "Filesystem Name '' is invalid or empty!")); +} + +TEST_CASE_METHOD(DeleteAzureDataLakeStorageTestsFixture, "Connection String is empty", "[azureDataLakeStorageParameters]") { + plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), ""); + REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception); + REQUIRE(getFailedFlowFileContents().size() == 0); +} + +TEST_CASE_METHOD(DeleteAzureDataLakeStorageTestsFixture, "Delete file succeeds", "[azureDataLakeStorageDelete]") { + test_controller_.runSession(plan_, true); + REQUIRE(getFailedFlowFileContents().size() == 0); + using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime; + auto passed_params = mock_data_lake_storage_client_ptr_->getPassedDeleteParams(); + CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING); + CHECK(passed_params.file_system_name == FILESYSTEM_NAME); + CHECK(passed_params.directory_name == DIRECTORY_NAME); + CHECK(passed_params.filename == GETFILE_FILE_NAME); + CHECK(verifyLogLinePresenceInPollTime(1s, "key:filename value:" + GETFILE_FILE_NAME)); +} + +TEST_CASE_METHOD(DeleteAzureDataLakeStorageTestsFixture, "Delete file fails", "[azureDataLakeStorageDelete]") { + mock_data_lake_storage_client_ptr_->setDeleteFailure(true); + test_controller_.runSession(plan_, true); + using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime; + auto passed_params = mock_data_lake_storage_client_ptr_->getPassedDeleteParams(); + CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING); + CHECK(passed_params.file_system_name == FILESYSTEM_NAME); + CHECK(passed_params.directory_name == DIRECTORY_NAME); + CHECK(passed_params.filename == GETFILE_FILE_NAME); + CHECK_FALSE(LogTestController::getInstance().contains("key:filename value:", 0s, 0ms)); + auto failed_flowfiles = getFailedFlowFileContents(); + REQUIRE(failed_flowfiles.size() == 1); + REQUIRE(failed_flowfiles[0] == TEST_DATA); +} + +TEST_CASE_METHOD(DeleteAzureDataLakeStorageTestsFixture, "Delete result is false", "[azureDataLakeStorageDelete]") { + mock_data_lake_storage_client_ptr_->setDeleteResult(false); + test_controller_.runSession(plan_, true); + auto passed_params = mock_data_lake_storage_client_ptr_->getPassedDeleteParams(); + using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime; + CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING); + CHECK(passed_params.file_system_name == FILESYSTEM_NAME); + CHECK(passed_params.directory_name == DIRECTORY_NAME); + CHECK(passed_params.filename == GETFILE_FILE_NAME); + CHECK_FALSE(LogTestController::getInstance().contains("key:filename value:", 0s, 0ms)); + auto failed_flowfiles = getFailedFlowFileContents(); + REQUIRE(failed_flowfiles.size() == 1); + REQUIRE(failed_flowfiles[0] == TEST_DATA); +} + +} // namespace diff --git a/libminifi/test/azure-tests/MockDataLakeStorageClient.h b/libminifi/test/azure-tests/MockDataLakeStorageClient.h new file mode 100644 index 0000000..eeac4dc --- /dev/null +++ b/libminifi/test/azure-tests/MockDataLakeStorageClient.h @@ -0,0 +1,96 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <string> +#include <stdexcept> + +#include "storage/DataLakeStorageClient.h" + +class MockDataLakeStorageClient : public org::apache::nifi::minifi::azure::storage::DataLakeStorageClient { + public: + const std::string PRIMARY_URI = "http://test-uri/file"; + + bool createFile(const org::apache::nifi::minifi::azure::storage::PutAzureDataLakeStorageParameters& /*params*/) override { + if (file_creation_error_) { + throw std::runtime_error("error"); + } + return create_file_; + } + + std::string uploadFile(const org::apache::nifi::minifi::azure::storage::PutAzureDataLakeStorageParameters& params, gsl::span<const uint8_t> buffer) override { + input_data_ = std::string(buffer.begin(), buffer.end()); + put_params_ = params; + + if (upload_fails_) { + throw std::runtime_error("error"); + } + + return RETURNED_PRIMARY_URI; + } + + bool deleteFile(const org::apache::nifi::minifi::azure::storage::DeleteAzureDataLakeStorageParameters& params) override { + delete_params_ = params; + + if (delete_fails_) { + throw std::runtime_error("error"); + } + + return delete_result_; + } + + void setFileCreation(bool create_file) { + create_file_ = create_file; + } + + void setFileCreationError(bool file_creation_error) { + file_creation_error_ = file_creation_error; + } + + void setUploadFailure(bool upload_fails) { + upload_fails_ = upload_fails; + } + + void setDeleteFailure(bool delete_fails) { + delete_fails_ = delete_fails; + } + + void setDeleteResult(bool delete_result) { + delete_result_ = delete_result; + } + + org::apache::nifi::minifi::azure::storage::PutAzureDataLakeStorageParameters getPassedPutParams() const { + return put_params_; + } + + org::apache::nifi::minifi::azure::storage::DeleteAzureDataLakeStorageParameters getPassedDeleteParams() const { + return delete_params_; + } + + private: + const std::string RETURNED_PRIMARY_URI = "http://test-uri/file?secret-sas"; + bool create_file_ = true; + bool file_creation_error_ = false; + bool upload_fails_ = false; + bool delete_fails_ = false; + bool delete_result_ = true; + std::string input_data_; + org::apache::nifi::minifi::azure::storage::PutAzureDataLakeStorageParameters put_params_; + org::apache::nifi::minifi::azure::storage::DeleteAzureDataLakeStorageParameters delete_params_; +}; diff --git a/libminifi/test/azure-tests/PutAzureBlobStorageTests.cpp b/libminifi/test/azure-tests/PutAzureBlobStorageTests.cpp index 816de5d..9c2b771 100644 --- a/libminifi/test/azure-tests/PutAzureBlobStorageTests.cpp +++ b/libminifi/test/azure-tests/PutAzureBlobStorageTests.cpp @@ -145,8 +145,7 @@ class PutAzureBlobStorageTestsFixture { auto lambda = [&file_contents](const std::string& path, const std::string& filename) -> bool { std::ifstream is(path + utils::file::FileUtils::get_separator() + filename, std::ifstream::binary); - std::string file_content((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()); - file_contents.push_back(file_content); + file_contents.push_back(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>())); return true; }; diff --git a/libminifi/test/azure-tests/PutAzureDataLakeStorageTests.cpp b/libminifi/test/azure-tests/PutAzureDataLakeStorageTests.cpp index a0520d4..39463c4 100644 --- a/libminifi/test/azure-tests/PutAzureDataLakeStorageTests.cpp +++ b/libminifi/test/azure-tests/PutAzureDataLakeStorageTests.cpp @@ -16,156 +16,18 @@ * limitations under the License. */ -#include "../TestBase.h" -#include "utils/IntegrationTestUtils.h" -#include "utils/TestUtils.h" -#include "core/Processor.h" +#include "AzureDataLakeStorageTestsFixture.h" #include "processors/PutAzureDataLakeStorage.h" -#include "processors/GetFile.h" -#include "processors/PutFile.h" -#include "processors/LogAttribute.h" -#include "processors/UpdateAttribute.h" -#include "storage/DataLakeStorageClient.h" -#include "utils/file/FileUtils.h" #include "controllerservices/AzureStorageCredentialsService.h" -using namespace std::chrono_literals; - -const std::string FILESYSTEM_NAME = "testfilesystem"; -const std::string DIRECTORY_NAME = "testdir"; -const std::string FILE_NAME = "testfile.txt"; -const std::string CONNECTION_STRING = "test-connectionstring"; -const std::string TEST_DATA = "data123"; -const std::string GETFILE_FILE_NAME = "input_data.log"; - -class MockDataLakeStorageClient : public minifi::azure::storage::DataLakeStorageClient { - public: - const std::string PRIMARY_URI = "http://test-uri/file"; - - bool createFile(const minifi::azure::storage::PutAzureDataLakeStorageParameters& /*params*/) override { - if (file_creation_error_) { - throw std::runtime_error("error"); - } - return create_file_; - } - - std::string uploadFile(const minifi::azure::storage::PutAzureDataLakeStorageParameters& params, gsl::span<const uint8_t> buffer) override { - input_data_ = std::string(buffer.begin(), buffer.end()); - params_ = params; - - if (upload_fails_) { - throw std::runtime_error("error"); - } - - return RETURNED_PRIMARY_URI; - } - - void setFileCreation(bool create_file) { - create_file_ = create_file; - } - - void setFileCreationError(bool file_creation_error) { - file_creation_error_ = file_creation_error; - } - - void setUploadFailure(bool upload_fails) { - upload_fails_ = upload_fails; - } - - minifi::azure::storage::PutAzureDataLakeStorageParameters getPassedParams() const { - return params_; - } - - private: - const std::string RETURNED_PRIMARY_URI = "http://test-uri/file?secret-sas"; - bool create_file_ = true; - bool file_creation_error_ = false; - bool upload_fails_ = false; - std::string input_data_; - minifi::azure::storage::PutAzureDataLakeStorageParameters params_; -}; +namespace { -class PutAzureDataLakeStorageTestsFixture { - public: - PutAzureDataLakeStorageTestsFixture() { - LogTestController::getInstance().setDebug<TestPlan>(); - LogTestController::getInstance().setDebug<minifi::core::Processor>(); - LogTestController::getInstance().setTrace<minifi::core::ProcessSession>(); - LogTestController::getInstance().setTrace<minifi::processors::GetFile>(); - LogTestController::getInstance().setTrace<minifi::processors::PutFile>(); - LogTestController::getInstance().setDebug<minifi::processors::UpdateAttribute>(); - LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); - LogTestController::getInstance().setTrace<minifi::azure::processors::PutAzureDataLakeStorage>(); - - // Build MiNiFi processing graph - plan_ = test_controller_.createPlan(); - auto mock_data_lake_storage_client = std::make_unique<MockDataLakeStorageClient>(); - mock_data_lake_storage_client_ptr_ = mock_data_lake_storage_client.get(); - put_azure_data_lake_storage_ = std::shared_ptr<minifi::azure::processors::PutAzureDataLakeStorage>( - new minifi::azure::processors::PutAzureDataLakeStorage("PutAzureDataLakeStorage", utils::Identifier(), std::move(mock_data_lake_storage_client))); - auto input_dir = test_controller_.createTempDirectory(); - utils::putFileToDir(input_dir, GETFILE_FILE_NAME, TEST_DATA); - - get_file_ = plan_->addProcessor("GetFile", "GetFile"); - plan_->setProperty(get_file_, minifi::processors::GetFile::Directory.getName(), input_dir); - plan_->setProperty(get_file_, minifi::processors::GetFile::KeepSourceFile.getName(), "false"); - - update_attribute_ = plan_->addProcessor("UpdateAttribute", "UpdateAttribute", { {"success", "d"} }, true); - plan_->addProcessor(put_azure_data_lake_storage_, "PutAzureDataLakeStorage", { {"success", "d"}, {"failure", "d"} }, true); - auto logattribute = plan_->addProcessor("LogAttribute", "LogAttribute", { {"success", "d"} }, true); - logattribute->setAutoTerminatedRelationships({{"success", "d"}}); - - putfile_ = plan_->addProcessor("PutFile", "PutFile", { {"success", "d"} }, false); - plan_->addConnection(put_azure_data_lake_storage_, {"failure", "d"}, putfile_); - putfile_->setAutoTerminatedRelationships({{"success", "d"}, {"failure", "d"}}); - output_dir_ = test_controller_.createTempDirectory(); - plan_->setProperty(putfile_, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), output_dir_); - - azure_storage_cred_service_ = plan_->addController("AzureStorageCredentialsService", "AzureStorageCredentialsService"); - setDefaultProperties(); - } - - std::vector<std::string> getFailedFlowFileContents() { - std::vector<std::string> file_contents; - - auto lambda = [&file_contents](const std::string& path, const std::string& filename) -> bool { - std::ifstream is(path + utils::file::FileUtils::get_separator() + filename, std::ifstream::binary); - std::string file_content((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()); - file_contents.push_back(file_content); - return true; - }; - - utils::file::FileUtils::list_dir(output_dir_, lambda, plan_->getLogger(), false); - return file_contents; - } - - void setDefaultProperties() { - plan_->setProperty(put_azure_data_lake_storage_, minifi::azure::processors::PutAzureDataLakeStorage::AzureStorageCredentialsService.getName(), "AzureStorageCredentialsService"); - plan_->setProperty(update_attribute_, "test.filesystemname", FILESYSTEM_NAME, true); - plan_->setProperty(put_azure_data_lake_storage_, minifi::azure::processors::PutAzureDataLakeStorage::FilesystemName.getName(), "${test.filesystemname}"); - plan_->setProperty(update_attribute_, "test.directoryname", DIRECTORY_NAME, true); - plan_->setProperty(put_azure_data_lake_storage_, minifi::azure::processors::PutAzureDataLakeStorage::DirectoryName.getName(), "${test.directoryname}"); - plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), CONNECTION_STRING); - } - - virtual ~PutAzureDataLakeStorageTestsFixture() { - LogTestController::getInstance().reset(); - } +using namespace std::chrono_literals; - protected: - TestController test_controller_; - std::shared_ptr<TestPlan> plan_; - MockDataLakeStorageClient* mock_data_lake_storage_client_ptr_; - std::shared_ptr<core::Processor> put_azure_data_lake_storage_; - std::shared_ptr<core::Processor> get_file_; - std::shared_ptr<core::Processor> update_attribute_; - std::shared_ptr<core::Processor> putfile_; - std::shared_ptr<core::controller::ControllerServiceNode> azure_storage_cred_service_; - std::string output_dir_; -}; +using PutAzureDataLakeStorageTestsFixture = AzureDataLakeStorageTestsFixture<minifi::azure::processors::PutAzureDataLakeStorage>; TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Azure storage credentials service is empty", "[azureDataLakeStorageParameters]") { - plan_->setProperty(put_azure_data_lake_storage_, minifi::azure::processors::PutAzureDataLakeStorage::AzureStorageCredentialsService.getName(), ""); + plan_->setProperty(azure_data_lake_storage_, minifi::azure::processors::PutAzureDataLakeStorage::AzureStorageCredentialsService.getName(), ""); REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception); REQUIRE(getFailedFlowFileContents().size() == 0); } @@ -176,7 +38,7 @@ TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Test Azure credentials wi plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName.getName(), "TEST_ACCOUNT"); plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), ""); test_controller_.runSession(plan_, true); - auto passed_params = mock_data_lake_storage_client_ptr_->getPassedParams(); + auto passed_params = mock_data_lake_storage_client_ptr_->getPassedPutParams(); CHECK(passed_params.credentials.buildConnectionString() == "AccountName=TEST_ACCOUNT;SharedAccessSignature=token"); REQUIRE(getFailedFlowFileContents().size() == 0); } @@ -187,7 +49,7 @@ TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Test Azure credentials wi plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::SASToken.getName(), "token"); plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName.getName(), "TEST_ACCOUNT"); test_controller_.runSession(plan_, true); - auto passed_params = mock_data_lake_storage_client_ptr_->getPassedParams(); + auto passed_params = mock_data_lake_storage_client_ptr_->getPassedPutParams(); CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING); REQUIRE(getFailedFlowFileContents().size() == 0); } @@ -198,7 +60,7 @@ TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Test Azure credentials wi plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::UseManagedIdentityCredentials.getName(), "true"); plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName.getName(), "TEST_ACCOUNT"); test_controller_.runSession(plan_, true); - auto passed_params = mock_data_lake_storage_client_ptr_->getPassedParams(); + auto passed_params = mock_data_lake_storage_client_ptr_->getPassedPutParams(); CHECK(passed_params.credentials.buildConnectionString().empty()); CHECK(passed_params.credentials.getStorageAccountName() == "TEST_ACCOUNT"); CHECK(passed_params.credentials.getEndpointSuffix() == "core.windows.net"); @@ -223,7 +85,7 @@ TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Connection String is empt TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Upload to Azure Data Lake Storage with default parameters", "[azureDataLakeStorageUpload]") { test_controller_.runSession(plan_, true); - auto passed_params = mock_data_lake_storage_client_ptr_->getPassedParams(); + auto passed_params = mock_data_lake_storage_client_ptr_->getPassedPutParams(); CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING); CHECK(passed_params.file_system_name == FILESYSTEM_NAME); CHECK(passed_params.directory_name == DIRECTORY_NAME); @@ -263,7 +125,7 @@ TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Transfer to failure on 'f } TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Transfer to success on 'ignore' resolution strategy if file exists", "[azureDataLakeStorageUpload]") { - plan_->setProperty(put_azure_data_lake_storage_, + plan_->setProperty(azure_data_lake_storage_, minifi::azure::processors::PutAzureDataLakeStorage::ConflictResolutionStrategy.getName(), toString(minifi::azure::processors::PutAzureDataLakeStorage::FileExistsResolutionStrategy::IGNORE_REQUEST)); mock_data_lake_storage_client_ptr_->setFileCreation(false); @@ -275,12 +137,12 @@ TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Transfer to success on 'i } TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Replace old file on 'replace' resolution strategy if file exists", "[azureDataLakeStorageUpload]") { - plan_->setProperty(put_azure_data_lake_storage_, + plan_->setProperty(azure_data_lake_storage_, minifi::azure::processors::PutAzureDataLakeStorage::ConflictResolutionStrategy.getName(), toString(minifi::azure::processors::PutAzureDataLakeStorage::FileExistsResolutionStrategy::REPLACE_FILE)); mock_data_lake_storage_client_ptr_->setFileCreation(false); test_controller_.runSession(plan_, true); - auto passed_params = mock_data_lake_storage_client_ptr_->getPassedParams(); + auto passed_params = mock_data_lake_storage_client_ptr_->getPassedPutParams(); CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING); CHECK(passed_params.file_system_name == FILESYSTEM_NAME); CHECK(passed_params.directory_name == DIRECTORY_NAME); @@ -296,11 +158,13 @@ TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Replace old file on 'repl } TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Upload to Azure Data Lake Storage with empty directory is accepted", "[azureDataLakeStorageUpload]") { - plan_->setProperty(put_azure_data_lake_storage_, minifi::azure::processors::PutAzureDataLakeStorage::DirectoryName.getName(), ""); + plan_->setProperty(azure_data_lake_storage_, minifi::azure::processors::PutAzureDataLakeStorage::DirectoryName.getName(), ""); test_controller_.runSession(plan_, true); - auto passed_params = mock_data_lake_storage_client_ptr_->getPassedParams(); + auto passed_params = mock_data_lake_storage_client_ptr_->getPassedPutParams(); CHECK(passed_params.directory_name == ""); REQUIRE(getFailedFlowFileContents().size() == 0); using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime; CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.directory value:\n")); } + +} // namespace
