This is an automated email from the ASF dual-hosted git repository. adebreceni pushed a commit to branch minifi-api-reduced in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit a28b89e7afea7abcd8c3a0d37ec5d55908606557 Author: Adam Debreceni <[email protected]> AuthorDate: Wed Mar 19 09:46:17 2025 +0100 Review changes --- extensions/aws/tests/S3TestsFixture.h | 4 +- .../azure/processors/AzureStorageProcessorBase.h | 2 +- .../azure/tests/AzureBlobStorageTestsFixture.h | 4 +- .../azure/tests/AzureDataLakeStorageTestsFixture.h | 2 +- .../azure/tests/ListAzureBlobStorageTests.cpp | 2 +- .../azure/tests/ListAzureDataLakeStorageTests.cpp | 2 +- extensions/libarchive/BinFiles.h | 2 +- extensions/libarchive/CompressContent.h | 2 +- extensions/libarchive/UnfocusArchiveEntry.h | 2 +- extensions/mqtt/processors/ConsumeMQTT.h | 2 +- extensions/mqtt/processors/PublishMQTT.h | 2 +- extensions/opc/include/fetchopc.h | 1 - extensions/opc/include/opcbase.h | 2 +- extensions/opc/include/putopc.h | 1 - extensions/opc/src/fetchopc.cpp | 1 - extensions/pdh/PerformanceDataMonitor.h | 2 +- extensions/procfs/processors/ProcFsMonitor.h | 2 +- extensions/python/PythonProcessor.h | 2 +- extensions/python/PythonScriptEngine.h | 2 +- extensions/python/types/PyProcessContext.cpp | 2 +- extensions/script/ExecuteScript.h | 2 +- extensions/sftp/processors/FetchSFTP.h | 2 +- extensions/sftp/processors/ListSFTP.h | 2 +- extensions/sftp/processors/PutSFTP.h | 2 +- extensions/smb/FetchSmb.h | 2 +- extensions/smb/ListSmb.h | 2 +- extensions/smb/PutSmb.h | 2 +- .../standard-processors/modbus/FetchModbusTcp.h | 2 +- .../processors/DefragmentText.h | 2 +- .../standard-processors/processors/ListFile.h | 2 +- extensions/standard-processors/processors/PutTCP.h | 3 + extensions/standard-processors/processors/PutUDP.h | 2 +- .../standard-processors/processors/RetryFlowFile.h | 2 +- extensions/systemd/ConsumeJournald.h | 2 +- extensions/test-processors/KamikazeProcessor.h | 2 +- .../test-processors/LogOnDestructionProcessor.h | 2 +- .../windows-event-log/ConsumeWindowsEventLog.h | 2 +- extensions/windows-event-log/TailEventLog.h | 2 +- extensions/windows-event-log/wel/MetadataWalker.h | 2 +- extensions/windows-event-log/wel/WindowsEventLog.h | 2 +- extensions/windows-event-log/wel/XMLString.h | 2 +- libminifi/include/Port.h | 13 +- libminifi/include/core/ProcessGroup.h | 2 +- libminifi/include/core/Processor.h | 165 ++++++++---- libminifi/include/core/ProcessorProxy.h | 281 --------------------- libminifi/src/core/ClassLoader.cpp | 4 +- libminifi/src/core/FlowConfiguration.cpp | 1 - libminifi/src/core/Processor.cpp | 189 ++++++++++++-- .../src/core/flow/StructuredConfiguration.cpp | 10 +- libminifi/test/libtest/unit/TestUtils.h | 6 +- .../include/minifi-cpp/core/ProcessContext.h | 1 - .../minifi-cpp/core/ProcessContextBuilder.h | 1 - .../core/{Processor.h => ProcessorApi.h} | 0 .../include/minifi-cpp/core/ProcessorFactory.h | 3 +- utils/include/core/Processor.h | 19 -- utils/include/core/ProcessorImpl.h | 2 +- .../src/core/{Processor.cpp => ProcessorImpl.cpp} | 2 +- utils/src/core/ProcessorMetrics.cpp | 2 +- 58 files changed, 342 insertions(+), 443 deletions(-) diff --git a/extensions/aws/tests/S3TestsFixture.h b/extensions/aws/tests/S3TestsFixture.h index f65030f84..360e2722f 100644 --- a/extensions/aws/tests/S3TestsFixture.h +++ b/extensions/aws/tests/S3TestsFixture.h @@ -137,7 +137,7 @@ class FlowProcessorS3TestsFixture : public S3TestsFixture<T> { this->mock_s3_request_sender_ptr = mock_s3_request_sender.get(); auto uuid = utils::IdGenerator::getIdGenerator()->generate(); auto impl = std::unique_ptr<T>(new T(core::ProcessorMetadata{.uuid = uuid, .name = "S3Processor", .logger = core::logging::LoggerFactory<T>::getLogger(uuid)}, std::move(mock_s3_request_sender))); - auto s3_processor_unique_ptr = std::make_unique<core::ProcessorProxy>("S3Processor", uuid, std::move(impl)); + auto s3_processor_unique_ptr = std::make_unique<core::Processor>("S3Processor", uuid, std::move(impl)); this->s3_processor = s3_processor_unique_ptr.get(); auto input_dir = this->test_controller.createTempDirectory(); @@ -201,7 +201,7 @@ class FlowProducerS3TestsFixture : public S3TestsFixture<T> { this->mock_s3_request_sender_ptr = mock_s3_request_sender.get(); auto uuid = utils::IdGenerator::getIdGenerator()->generate(); auto impl = std::unique_ptr<T>(new T(core::ProcessorMetadata{.uuid = uuid, .name = "S3Processor", .logger = core::logging::LoggerFactory<T>::getLogger(uuid)}, std::move(mock_s3_request_sender))); - auto s3_processor_unique_ptr = std::make_unique<core::ProcessorProxy>("S3Processor", uuid, std::move(impl)); + auto s3_processor_unique_ptr = std::make_unique<core::Processor>("S3Processor", uuid, std::move(impl)); this->s3_processor = s3_processor_unique_ptr.get(); this->plan->addProcessor( diff --git a/extensions/azure/processors/AzureStorageProcessorBase.h b/extensions/azure/processors/AzureStorageProcessorBase.h index 13955ee4d..5eb0b07b1 100644 --- a/extensions/azure/processors/AzureStorageProcessorBase.h +++ b/extensions/azure/processors/AzureStorageProcessorBase.h @@ -29,7 +29,7 @@ #include "core/PropertyDefinition.h" #include "core/Property.h" #include "core/PropertyDefinitionBuilder.h" -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "core/logging/Logger.h" #include "storage/AzureStorageCredentials.h" diff --git a/extensions/azure/tests/AzureBlobStorageTestsFixture.h b/extensions/azure/tests/AzureBlobStorageTestsFixture.h index 68d278711..897a7905f 100644 --- a/extensions/azure/tests/AzureBlobStorageTestsFixture.h +++ b/extensions/azure/tests/AzureBlobStorageTestsFixture.h @@ -33,7 +33,7 @@ #include "processors/UpdateAttribute.h" #include "utils/file/FileUtils.h" #include "MockBlobStorage.h" -#include "core/ProcessorProxy.h" +#include "core/Processor.h" const std::string CONTAINER_NAME = "test-container"; const std::string STORAGE_ACCOUNT_NAME = "test-account"; @@ -65,7 +65,7 @@ class AzureBlobStorageTestsFixture { auto uuid = utils::IdGenerator::getIdGenerator()->generate(); auto impl = std::unique_ptr<ProcessorType>( new ProcessorType({.uuid = uuid, .name = "AzureBlobStorageProcessor", .logger = logging::LoggerFactory<ProcessorType>::getLogger(uuid)}, std::move(mock_blob_storage))); - auto azure_blob_storage_processor_unique_ptr = std::make_unique<core::ProcessorProxy>(impl->getName(), impl->getUUID(), std::move(impl)); + auto azure_blob_storage_processor_unique_ptr = std::make_unique<core::Processor>(impl->getName(), impl->getUUID(), std::move(impl)); azure_blob_storage_processor_ = azure_blob_storage_processor_unique_ptr.get(); auto input_dir = test_controller_.createTempDirectory(); std::ofstream input_file_stream(input_dir / GET_FILE_NAME); diff --git a/extensions/azure/tests/AzureDataLakeStorageTestsFixture.h b/extensions/azure/tests/AzureDataLakeStorageTestsFixture.h index e49b3959a..e7f505f3c 100644 --- a/extensions/azure/tests/AzureDataLakeStorageTestsFixture.h +++ b/extensions/azure/tests/AzureDataLakeStorageTestsFixture.h @@ -63,7 +63,7 @@ class AzureDataLakeStorageTestsFixture { auto uuid = utils::IdGenerator::getIdGenerator()->generate(); auto impl = std::unique_ptr<AzureDataLakeStorageProcessor>( new AzureDataLakeStorageProcessor({.uuid = uuid, .name = "AzureDataLakeStorageProcessor", .logger = logging::LoggerFactory<AzureDataLakeStorageProcessor>::getLogger(uuid)}, std::move(mock_data_lake_storage_client))); - auto azure_data_lake_storage_unique_ptr = std::make_unique<core::ProcessorProxy>(impl->getName(), impl->getUUID(), std::move(impl)); + auto azure_data_lake_storage_unique_ptr = std::make_unique<core::Processor>(impl->getName(), impl->getUUID(), std::move(impl)); azure_data_lake_storage_ = azure_data_lake_storage_unique_ptr.get(); auto input_dir = test_controller_.createTempDirectory(); minifi::test::utils::putFileToDir(input_dir, GETFILE_FILE_NAME, TEST_DATA); diff --git a/extensions/azure/tests/ListAzureBlobStorageTests.cpp b/extensions/azure/tests/ListAzureBlobStorageTests.cpp index a63517035..48c222d58 100644 --- a/extensions/azure/tests/ListAzureBlobStorageTests.cpp +++ b/extensions/azure/tests/ListAzureBlobStorageTests.cpp @@ -47,7 +47,7 @@ class ListAzureBlobStorageTestsFixture { mock_blob_storage_ptr_ = mock_blob_storage.get(); auto uuid = utils::IdGenerator::getIdGenerator()->generate(); auto impl = std::make_unique<minifi::azure::processors::ListAzureBlobStorage>(core::ProcessorMetadata{.uuid = uuid, .name = "ListAzureBlobStorage", .logger = logging::LoggerFactory<minifi::azure::processors::ListAzureBlobStorage>::getLogger(uuid)}, std::move(mock_blob_storage)); - auto list_azure_blob_storage_unique_ptr = std::make_unique<core::ProcessorProxy>(impl->getName(), impl->getUUID(), std::move(impl)); + auto list_azure_blob_storage_unique_ptr = std::make_unique<core::Processor>(impl->getName(), impl->getUUID(), std::move(impl)); list_azure_blob_storage_ = list_azure_blob_storage_unique_ptr.get(); plan_->addProcessor(std::move(list_azure_blob_storage_unique_ptr), "ListAzureBlobStorage", { {"success", "d"} }); diff --git a/extensions/azure/tests/ListAzureDataLakeStorageTests.cpp b/extensions/azure/tests/ListAzureDataLakeStorageTests.cpp index ba89a8b5b..9335336f2 100644 --- a/extensions/azure/tests/ListAzureDataLakeStorageTests.cpp +++ b/extensions/azure/tests/ListAzureDataLakeStorageTests.cpp @@ -45,7 +45,7 @@ class ListAzureDataLakeStorageTestsFixture { auto uuid = utils::IdGenerator::getIdGenerator()->generate(); auto impl = std::unique_ptr<minifi::azure::processors::ListAzureDataLakeStorage>( new minifi::azure::processors::ListAzureDataLakeStorage({.uuid = uuid, .name = "ListAzureDataLakeStorage", .logger = logging::LoggerFactory<minifi::azure::processors::ListAzureDataLakeStorage>::getLogger(uuid)}, std::move(mock_data_lake_storage_client))); - auto list_azure_data_lake_storage_unique_ptr = std::make_unique<core::ProcessorProxy>(impl->getName(), impl->getUUID(), std::move(impl)); + auto list_azure_data_lake_storage_unique_ptr = std::make_unique<core::Processor>(impl->getName(), impl->getUUID(), std::move(impl)); list_azure_data_lake_storage_ = list_azure_data_lake_storage_unique_ptr.get(); plan_->addProcessor(std::move(list_azure_data_lake_storage_unique_ptr), "ListAzureDataLakeStorage", { {"success", "d"} }); diff --git a/extensions/libarchive/BinFiles.h b/extensions/libarchive/BinFiles.h index 5d89c1790..0c5517ee0 100644 --- a/extensions/libarchive/BinFiles.h +++ b/extensions/libarchive/BinFiles.h @@ -27,7 +27,7 @@ #include <utility> #include "FlowFileRecord.h" -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "core/ProcessSession.h" #include "core/PropertyDefinitionBuilder.h" #include "minifi-cpp/core/PropertyValidator.h" diff --git a/extensions/libarchive/CompressContent.h b/extensions/libarchive/CompressContent.h index 361ca871c..171a9c1e4 100644 --- a/extensions/libarchive/CompressContent.h +++ b/extensions/libarchive/CompressContent.h @@ -31,7 +31,7 @@ #include "archive.h" #include "FlowFileRecord.h" -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "core/ProcessSession.h" #include "core/Core.h" #include "core/PropertyDefinition.h" diff --git a/extensions/libarchive/UnfocusArchiveEntry.h b/extensions/libarchive/UnfocusArchiveEntry.h index f89832d5b..0261eb57f 100644 --- a/extensions/libarchive/UnfocusArchiveEntry.h +++ b/extensions/libarchive/UnfocusArchiveEntry.h @@ -28,7 +28,7 @@ #include "FocusArchiveEntry.h" #include "FlowFileRecord.h" #include "ArchiveMetadata.h" -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "core/ProcessSession.h" #include "core/RelationshipDefinition.h" #include "core/Core.h" diff --git a/extensions/mqtt/processors/ConsumeMQTT.h b/extensions/mqtt/processors/ConsumeMQTT.h index 87c2b3d31..5331763f7 100644 --- a/extensions/mqtt/processors/ConsumeMQTT.h +++ b/extensions/mqtt/processors/ConsumeMQTT.h @@ -26,7 +26,7 @@ #include "FlowFileRecord.h" #include "core/Core.h" #include "core/OutputAttributeDefinition.h" -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "core/ProcessSession.h" #include "core/PropertyDefinition.h" #include "core/logging/LoggerFactory.h" diff --git a/extensions/mqtt/processors/PublishMQTT.h b/extensions/mqtt/processors/PublishMQTT.h index d6d9ed7cd..3f252e6b9 100644 --- a/extensions/mqtt/processors/PublishMQTT.h +++ b/extensions/mqtt/processors/PublishMQTT.h @@ -24,7 +24,7 @@ #include <vector> #include "core/RelationshipDefinition.h" -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "core/ProcessSession.h" #include "core/PropertyDefinitionBuilder.h" #include "core/Core.h" diff --git a/extensions/opc/include/fetchopc.h b/extensions/opc/include/fetchopc.h index 6b7f0e04a..db23f7aa2 100644 --- a/extensions/opc/include/fetchopc.h +++ b/extensions/opc/include/fetchopc.h @@ -25,7 +25,6 @@ #include "opc.h" #include "opcbase.h" #include "FlowFileRecord.h" -#include "core/Processor.h" #include "core/ProcessSession.h" #include "core/Property.h" #include "core/PropertyDefinitionBuilder.h" diff --git a/extensions/opc/include/opcbase.h b/extensions/opc/include/opcbase.h index a8e699f20..bdb79f558 100644 --- a/extensions/opc/include/opcbase.h +++ b/extensions/opc/include/opcbase.h @@ -23,7 +23,7 @@ #include <vector> #include "opc.h" -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "core/ProcessSession.h" #include "core/PropertyDefinition.h" #include "core/PropertyDefinitionBuilder.h" diff --git a/extensions/opc/include/putopc.h b/extensions/opc/include/putopc.h index 4a64b2e29..db76d4329 100644 --- a/extensions/opc/include/putopc.h +++ b/extensions/opc/include/putopc.h @@ -25,7 +25,6 @@ #include "opc.h" #include "opcbase.h" #include "FlowFileRecord.h" -#include "core/Processor.h" #include "core/ProcessSession.h" #include "core/Property.h" #include "core/PropertyDefinitionBuilder.h" diff --git a/extensions/opc/src/fetchopc.cpp b/extensions/opc/src/fetchopc.cpp index f148d2de4..7a992aa60 100644 --- a/extensions/opc/src/fetchopc.cpp +++ b/extensions/opc/src/fetchopc.cpp @@ -23,7 +23,6 @@ #include "core/ProcessContext.h" #include "core/ProcessSession.h" -#include "core/Processor.h" #include "core/Resource.h" #include "opc.h" #include "utils/Enum.h" diff --git a/extensions/pdh/PerformanceDataMonitor.h b/extensions/pdh/PerformanceDataMonitor.h index 024d0333a..b644d81a5 100644 --- a/extensions/pdh/PerformanceDataMonitor.h +++ b/extensions/pdh/PerformanceDataMonitor.h @@ -25,7 +25,7 @@ #include <vector> #include "core/logging/LoggerFactory.h" -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "core/PropertyDefinition.h" #include "core/PropertyDefinitionBuilder.h" #include "core/RelationshipDefinition.h" diff --git a/extensions/procfs/processors/ProcFsMonitor.h b/extensions/procfs/processors/ProcFsMonitor.h index 63764d0ed..c4ce0cd28 100644 --- a/extensions/procfs/processors/ProcFsMonitor.h +++ b/extensions/procfs/processors/ProcFsMonitor.h @@ -25,7 +25,7 @@ #include <optional> #include "../ProcFs.h" -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "core/PropertyDefinition.h" #include "core/PropertyDefinitionBuilder.h" #include "core/RelationshipDefinition.h" diff --git a/extensions/python/PythonProcessor.h b/extensions/python/PythonProcessor.h index 84915b213..a367a7dde 100644 --- a/extensions/python/PythonProcessor.h +++ b/extensions/python/PythonProcessor.h @@ -23,7 +23,7 @@ #include <memory> #include <optional> -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "utils/gsl.h" namespace org::apache::nifi::minifi::extensions::python { diff --git a/extensions/python/PythonScriptEngine.h b/extensions/python/PythonScriptEngine.h index e29383aaa..8b7348a9c 100644 --- a/extensions/python/PythonScriptEngine.h +++ b/extensions/python/PythonScriptEngine.h @@ -26,7 +26,7 @@ #include <filesystem> #include "core/ProcessSession.h" -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "PythonBindings.h" #include "PyException.h" diff --git a/extensions/python/types/PyProcessContext.cpp b/extensions/python/types/PyProcessContext.cpp index 910e1b3f4..4d9e9af0d 100644 --- a/extensions/python/types/PyProcessContext.cpp +++ b/extensions/python/types/PyProcessContext.cpp @@ -21,7 +21,7 @@ a * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #include "PyStateManager.h" #include "PyScriptFlowFile.h" -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "controllers/RecordSetReader.h" #include "controllers/RecordSetWriter.h" diff --git a/extensions/script/ExecuteScript.h b/extensions/script/ExecuteScript.h index 3cd8284ec..1a75f2c89 100644 --- a/extensions/script/ExecuteScript.h +++ b/extensions/script/ExecuteScript.h @@ -25,7 +25,7 @@ #include <optional> #include "concurrentqueue.h" -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "core/PropertyDefinition.h" #include "core/PropertyDefinitionBuilder.h" #include "core/RelationshipDefinition.h" diff --git a/extensions/sftp/processors/FetchSFTP.h b/extensions/sftp/processors/FetchSFTP.h index 7a1d86f8f..510e780f7 100644 --- a/extensions/sftp/processors/FetchSFTP.h +++ b/extensions/sftp/processors/FetchSFTP.h @@ -23,7 +23,7 @@ #include "SFTPProcessorBase.h" #include "utils/ByteArrayCallback.h" #include "FlowFileRecord.h" -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "core/ProcessSession.h" #include "core/Core.h" #include "core/Property.h" diff --git a/extensions/sftp/processors/ListSFTP.h b/extensions/sftp/processors/ListSFTP.h index daf5824ef..b8f8da831 100644 --- a/extensions/sftp/processors/ListSFTP.h +++ b/extensions/sftp/processors/ListSFTP.h @@ -30,7 +30,7 @@ #include <string_view> #include "SFTPProcessorBase.h" -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "core/ProcessSession.h" #include "core/Property.h" #include "core/PropertyDefinitionBuilder.h" diff --git a/extensions/sftp/processors/PutSFTP.h b/extensions/sftp/processors/PutSFTP.h index 93ecf78c9..2343bc4d7 100644 --- a/extensions/sftp/processors/PutSFTP.h +++ b/extensions/sftp/processors/PutSFTP.h @@ -27,7 +27,7 @@ #include "SFTPProcessorBase.h" #include "utils/ByteArrayCallback.h" #include "FlowFileRecord.h" -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "core/ProcessSession.h" #include "core/Core.h" #include "core/Property.h" diff --git a/extensions/smb/FetchSmb.h b/extensions/smb/FetchSmb.h index 8f84d361d..3d3995b6d 100644 --- a/extensions/smb/FetchSmb.h +++ b/extensions/smb/FetchSmb.h @@ -23,7 +23,7 @@ #include <utility> #include "SmbConnectionControllerService.h" -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "core/ProcessSession.h" #include "core/Property.h" #include "core/PropertyDefinition.h" diff --git a/extensions/smb/ListSmb.h b/extensions/smb/ListSmb.h index 3e613487e..b00e64b04 100644 --- a/extensions/smb/ListSmb.h +++ b/extensions/smb/ListSmb.h @@ -23,7 +23,7 @@ #include <utility> #include "SmbConnectionControllerService.h" -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "core/ProcessSession.h" #include "core/Property.h" #include "core/PropertyDefinition.h" diff --git a/extensions/smb/PutSmb.h b/extensions/smb/PutSmb.h index 2a531a73f..a497e3b3c 100644 --- a/extensions/smb/PutSmb.h +++ b/extensions/smb/PutSmb.h @@ -21,7 +21,7 @@ #include <string> #include <utility> -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "core/ProcessSession.h" #include "utils/Enum.h" #include "SmbConnectionControllerService.h" diff --git a/extensions/standard-processors/modbus/FetchModbusTcp.h b/extensions/standard-processors/modbus/FetchModbusTcp.h index 3b21b9da0..9fb0c4d4a 100644 --- a/extensions/standard-processors/modbus/FetchModbusTcp.h +++ b/extensions/standard-processors/modbus/FetchModbusTcp.h @@ -18,7 +18,7 @@ #include "controllers/RecordSetWriter.h" #include "controllers/SSLContextServiceInterface.h" -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "core/PropertyDefinitionBuilder.h" #include "core/logging/LoggerFactory.h" #include "utils/net/AsioCoro.h" diff --git a/extensions/standard-processors/processors/DefragmentText.h b/extensions/standard-processors/processors/DefragmentText.h index 329d64d4d..300224970 100644 --- a/extensions/standard-processors/processors/DefragmentText.h +++ b/extensions/standard-processors/processors/DefragmentText.h @@ -23,7 +23,7 @@ #include <unordered_map> #include <utility> -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "core/FlowFileStore.h" #include "core/logging/Logger.h" #include "core/logging/LoggerFactory.h" diff --git a/extensions/standard-processors/processors/ListFile.h b/extensions/standard-processors/processors/ListFile.h index 8a3037c4e..929d55e05 100644 --- a/extensions/standard-processors/processors/ListFile.h +++ b/extensions/standard-processors/processors/ListFile.h @@ -23,7 +23,7 @@ #include <utility> #include "core/OutputAttributeDefinition.h" -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "core/ProcessContext.h" #include "core/ProcessSession.h" #include "core/PropertyDefinition.h" diff --git a/extensions/standard-processors/processors/PutTCP.h b/extensions/standard-processors/processors/PutTCP.h index 71ea6858c..f95ec64c3 100644 --- a/extensions/standard-processors/processors/PutTCP.h +++ b/extensions/standard-processors/processors/PutTCP.h @@ -25,6 +25,9 @@ #include <utility> #include <vector> +#include "io/InputStream.h" +#include "core/ProcessorImpl.h" +#include "utils/Export.h" #include "asio/io_context.hpp" #include "asio/ssl/context.hpp" #include "controllers/SSLContextServiceInterface.h" diff --git a/extensions/standard-processors/processors/PutUDP.h b/extensions/standard-processors/processors/PutUDP.h index e4d7bf850..b8fbbd584 100644 --- a/extensions/standard-processors/processors/PutUDP.h +++ b/extensions/standard-processors/processors/PutUDP.h @@ -21,7 +21,7 @@ #include <utility> #include <vector> -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "core/PropertyDefinition.h" #include "core/PropertyDefinitionBuilder.h" #include "core/RelationshipDefinition.h" diff --git a/extensions/standard-processors/processors/RetryFlowFile.h b/extensions/standard-processors/processors/RetryFlowFile.h index b6db59d84..8322f82c0 100644 --- a/extensions/standard-processors/processors/RetryFlowFile.h +++ b/extensions/standard-processors/processors/RetryFlowFile.h @@ -27,7 +27,7 @@ #include "core/Core.h" #include "core/OutputAttributeDefinition.h" -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "core/ProcessContext.h" #include "core/ProcessSession.h" #include "core/PropertyDefinition.h" diff --git a/extensions/systemd/ConsumeJournald.h b/extensions/systemd/ConsumeJournald.h index 8557adb16..16a3f2e10 100644 --- a/extensions/systemd/ConsumeJournald.h +++ b/extensions/systemd/ConsumeJournald.h @@ -30,7 +30,7 @@ #include <vector> #include "core/StateManager.h" -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "core/PropertyDefinition.h" #include "core/PropertyDefinitionBuilder.h" #include "minifi-cpp/core/PropertyValidator.h" diff --git a/extensions/test-processors/KamikazeProcessor.h b/extensions/test-processors/KamikazeProcessor.h index d2a58ae9d..085fc48d9 100644 --- a/extensions/test-processors/KamikazeProcessor.h +++ b/extensions/test-processors/KamikazeProcessor.h @@ -19,7 +19,7 @@ #include <string> #include <utility> -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "core/ProcessSession.h" #include "core/PropertyDefinition.h" #include "core/PropertyDefinitionBuilder.h" diff --git a/extensions/test-processors/LogOnDestructionProcessor.h b/extensions/test-processors/LogOnDestructionProcessor.h index 1937dc9d6..61186865a 100644 --- a/extensions/test-processors/LogOnDestructionProcessor.h +++ b/extensions/test-processors/LogOnDestructionProcessor.h @@ -23,7 +23,7 @@ #include <utility> #include "core/logging/LoggerFactory.h" -#include "core/Processor.h" +#include "core/ProcessorImpl.h" namespace org::apache::nifi::minifi::processors { diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.h b/extensions/windows-event-log/ConsumeWindowsEventLog.h index ecb395698..1fc66e64f 100644 --- a/extensions/windows-event-log/ConsumeWindowsEventLog.h +++ b/extensions/windows-event-log/ConsumeWindowsEventLog.h @@ -35,7 +35,7 @@ #include <string> #include "core/Core.h" -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "core/ProcessSession.h" #include "core/PropertyDefinition.h" #include "core/PropertyDefinitionBuilder.h" diff --git a/extensions/windows-event-log/TailEventLog.h b/extensions/windows-event-log/TailEventLog.h index d91ff85bc..e8b606f64 100644 --- a/extensions/windows-event-log/TailEventLog.h +++ b/extensions/windows-event-log/TailEventLog.h @@ -26,7 +26,7 @@ #include "core/Core.h" #include "FlowFileRecord.h" -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "core/ProcessSession.h" #include "core/PropertyDefinition.h" #include "core/PropertyDefinitionBuilder.h" diff --git a/extensions/windows-event-log/wel/MetadataWalker.h b/extensions/windows-event-log/wel/MetadataWalker.h index 869e254f1..9826f09e2 100644 --- a/extensions/windows-event-log/wel/MetadataWalker.h +++ b/extensions/windows-event-log/wel/MetadataWalker.h @@ -32,7 +32,7 @@ #include <utility> #include "core/Core.h" -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "core/ProcessSession.h" #include "FlowFileRecord.h" #include "WindowsEventLog.h" diff --git a/extensions/windows-event-log/wel/WindowsEventLog.h b/extensions/windows-event-log/wel/WindowsEventLog.h index cef720a3b..5a2029f2d 100644 --- a/extensions/windows-event-log/wel/WindowsEventLog.h +++ b/extensions/windows-event-log/wel/WindowsEventLog.h @@ -32,7 +32,7 @@ #include "core/Core.h" #include "concurrentqueue.h" -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "core/ProcessSession.h" #include "utils/OsUtils.h" #include "FlowFileRecord.h" diff --git a/extensions/windows-event-log/wel/XMLString.h b/extensions/windows-event-log/wel/XMLString.h index 18d35bb86..cb42218c4 100644 --- a/extensions/windows-event-log/wel/XMLString.h +++ b/extensions/windows-event-log/wel/XMLString.h @@ -28,7 +28,7 @@ #include <regex> #include "core/Core.h" -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "core/ProcessSession.h" #include "FlowFileRecord.h" #include "utils/OsUtils.h" diff --git a/libminifi/include/Port.h b/libminifi/include/Port.h index 201d4faee..a8970e6bf 100644 --- a/libminifi/include/Port.h +++ b/libminifi/include/Port.h @@ -21,7 +21,7 @@ #include <utility> #include "ForwardingNode.h" -#include "core/ProcessorProxy.h" +#include "core/Processor.h" namespace org::apache::nifi::minifi { @@ -30,10 +30,7 @@ enum class PortType { OUTPUT }; -class Port : public virtual core::Processor { - public: - virtual PortType getPortType() const = 0; -}; +class Port; class PortImpl final : public ForwardingNode { public: @@ -49,11 +46,11 @@ class PortImpl final : public ForwardingNode { PortType port_type_; }; -class PortProxy : public virtual Port, public core::ProcessorProxy { +class Port : public core::Processor { public: - PortProxy(std::string_view name, const utils::Identifier& uuid, std::unique_ptr<PortImpl> impl): ProcessorProxy(name, uuid, std::move(impl)) {} + Port(std::string_view name, const utils::Identifier& uuid, std::unique_ptr<PortImpl> impl): Processor(name, uuid, std::move(impl)) {} - PortType getPortType() const override { + PortType getPortType() const { auto* port_impl = dynamic_cast<const PortImpl*>(impl_.get()); gsl_Assert(port_impl); return port_impl->getPortType(); diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h index 1a267b04e..fc6b8ec89 100644 --- a/libminifi/include/core/ProcessGroup.h +++ b/libminifi/include/core/ProcessGroup.h @@ -29,7 +29,7 @@ #include <utility> #include <tuple> -#include "minifi-cpp/core/Processor.h" +#include "core/Processor.h" #include "Exception.h" #include "TimerDrivenSchedulingAgent.h" #include "EventDrivenSchedulingAgent.h" diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h index 285b16d2d..5558db8c5 100644 --- a/libminifi/include/core/Processor.h +++ b/libminifi/include/core/Processor.h @@ -16,23 +16,34 @@ */ #pragma once +#include <algorithm> +#include <atomic> #include <chrono> +#include <condition_variable> +#include <functional> #include <memory> #include <mutex> #include <string> +#include <string_view> #include <unordered_set> #include <unordered_map> +#include <utility> +#include <vector> -#include "minifi-cpp/core/ConfigurableComponent.h" -#include "minifi-cpp/core/Connectable.h" -#include "minifi-cpp/core/Property.h" -#include "minifi-cpp/core/DynamicProperty.h" -#include "minifi-cpp/core/Core.h" +#include "core/ConfigurableComponentImpl.h" +#include "core/Connectable.h" +#include "core/Property.h" +#include "core/Core.h" #include "minifi-cpp/core/Annotation.h" +#include "minifi-cpp/core/DynamicProperty.h" #include "minifi-cpp/core/Scheduling.h" #include "minifi-cpp/core/state/nodes/MetricsBase.h" #include "minifi-cpp/core/ProcessorMetrics.h" #include "utils/gsl.h" +#include "utils/Id.h" +#include "minifi-cpp/core/OutputAttributeDefinition.h" +#include "Processor.h" +#include "minifi-cpp/core/ProcessorApi.h" namespace org::apache::nifi::minifi { @@ -40,55 +51,78 @@ class Connection; namespace core { -class ProcessorApi; class ProcessContext; class ProcessSession; class ProcessSessionFactory; -class Processor : public virtual Connectable, public virtual ConfigurableComponent, public virtual state::response::ResponseNodeSource { +class Processor : public ConnectableImpl, public ConfigurableComponentImpl, public state::response::ResponseNodeSource { public: - ~Processor() override = default; - - virtual void setScheduledState(ScheduledState state) = 0; - virtual ScheduledState getScheduledState() const = 0; - virtual void setSchedulingStrategy(SchedulingStrategy strategy) = 0; - virtual SchedulingStrategy getSchedulingStrategy() const = 0; - virtual void setSchedulingPeriod(std::chrono::steady_clock::duration period) = 0; - virtual std::chrono::steady_clock::duration getSchedulingPeriod() const = 0; - virtual void setCronPeriod(const std::string &period) = 0; - virtual std::string getCronPeriod() const = 0; - virtual void setRunDurationNano(std::chrono::steady_clock::duration period) = 0; - virtual std::chrono::steady_clock::duration getRunDurationNano() const = 0; - virtual void setYieldPeriodMsec(std::chrono::milliseconds period) = 0; - virtual std::chrono::steady_clock::duration getYieldPeriod() const = 0; - virtual void setPenalizationPeriod(std::chrono::milliseconds period) = 0; - virtual bool isSingleThreaded() const = 0; - virtual std::string getProcessorType() const = 0; - virtual bool getTriggerWhenEmpty() const = 0; - virtual uint8_t getActiveTasks() const = 0; - virtual void incrementActiveTasks() = 0; - virtual void decrementActiveTask() = 0; - virtual void clearActiveTask() = 0; - using Connectable::yield; - virtual void yield(std::chrono::steady_clock::duration delta_time) = 0; - virtual bool isYield() = 0; - virtual void clearYield() = 0; - virtual std::chrono::steady_clock::time_point getYieldExpirationTime() const = 0; - virtual std::chrono::steady_clock::duration getYieldTime() const = 0; - virtual bool addConnection(Connectable* connection) = 0; - virtual void triggerAndCommit(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSessionFactory>& session_factory) = 0; - virtual void trigger(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSession>& process_session) = 0; - virtual void onTrigger(ProcessContext&, ProcessSession&) = 0; - virtual void onSchedule(ProcessContext&, ProcessSessionFactory&) = 0; - virtual void onUnSchedule() = 0; - virtual bool isThrottledByBackpressure() const = 0; - virtual void validateAnnotations() const = 0; - virtual annotation::Input getInputRequirement() const = 0; - virtual gsl::not_null<std::shared_ptr<ProcessorMetrics>> getMetrics() const = 0; - virtual std::string getProcessGroupUUIDStr() const = 0; - virtual void setProcessGroupUUIDStr(const std::string &uuid) = 0; - - virtual ProcessorApi& getImpl() const = 0; + Processor(std::string_view name, const utils::Identifier& uuid, std::unique_ptr<ProcessorApi> impl); + explicit Processor(std::string_view name, std::unique_ptr<ProcessorApi> impl); + + Processor(const Processor& parent) = delete; + Processor& operator=(const Processor& parent) = delete; + + bool isRunning() const override; + + ~Processor() override; + + void setScheduledState(ScheduledState state); + ScheduledState getScheduledState() const; + void setSchedulingStrategy(SchedulingStrategy strategy); + SchedulingStrategy getSchedulingStrategy() const; + void setSchedulingPeriod(std::chrono::steady_clock::duration period); + std::chrono::steady_clock::duration getSchedulingPeriod() const; + void setCronPeriod(const std::string &period); + std::string getCronPeriod() const; + void setRunDurationNano(std::chrono::steady_clock::duration period); + std::chrono::steady_clock::duration getRunDurationNano() const; + void setYieldPeriodMsec(std::chrono::milliseconds period); + std::chrono::steady_clock::duration getYieldPeriod() const; + void setPenalizationPeriod(std::chrono::milliseconds period); + void setMaxConcurrentTasks(uint8_t tasks) override; + bool isSingleThreaded() const; + std::string getProcessorType() const; + bool getTriggerWhenEmpty() const; + uint8_t getActiveTasks() const; + void incrementActiveTasks(); + void decrementActiveTask(); + void clearActiveTask(); + std::string getProcessGroupUUIDStr() const; + void setProcessGroupUUIDStr(const std::string &uuid); + void yield() override; + void yield(std::chrono::steady_clock::duration delta_time); + bool isYield(); + void clearYield(); + std::chrono::steady_clock::time_point getYieldExpirationTime() const; + std::chrono::steady_clock::duration getYieldTime() const; + bool addConnection(Connectable* connection); + bool canEdit() override; + void initialize() override; + void triggerAndCommit(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSessionFactory>& session_factory); + void trigger(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSession>& process_session); + void onTrigger(ProcessContext& context, ProcessSession& session); + void onSchedule(ProcessContext& context, ProcessSessionFactory& session_factory); + void onUnSchedule(); + bool isWorkAvailable() override; + bool isThrottledByBackpressure() const; + Connectable* pickIncomingConnection() override; + void validateAnnotations() const; + annotation::Input getInputRequirement() const; + [[nodiscard]] bool supportsDynamicProperties() const override; + [[nodiscard]] bool supportsDynamicRelationships() const override; + state::response::SharedResponseNode getResponseNode() override; + gsl::not_null<std::shared_ptr<ProcessorMetrics>> getMetrics() const; + void restore(const std::shared_ptr<FlowFile>& file) override; + + static constexpr auto DynamicProperties = std::array<DynamicProperty, 0>{}; + + static constexpr auto OutputAttributes = std::array<OutputAttributeReference, 0>{}; + + ProcessorApi& getImpl() const { + gsl_Assert(impl_); + return *impl_; + } template<typename T> T& getImpl() const { @@ -97,8 +131,37 @@ class Processor : public virtual Connectable, public virtual ConfigurableCompone return *res; } - virtual void updateReachability(const std::lock_guard<std::mutex>& graph_lock, bool force = false) = 0; - virtual const std::unordered_map<Connection*, std::unordered_set<Processor*>>& reachable_processors() const = 0; + protected: + std::atomic<ScheduledState> state_; + + std::atomic<std::chrono::steady_clock::duration> scheduling_period_; + std::atomic<std::chrono::steady_clock::duration> run_duration_; + std::atomic<std::chrono::steady_clock::duration> yield_period_; + + std::atomic<uint8_t> active_tasks_; + + std::string cron_period_; + + std::shared_ptr<logging::Logger> logger_; + + private: + mutable std::mutex mutex_; + std::atomic<std::chrono::steady_clock::time_point> yield_expiration_{}; + + // must hold the graphMutex + void updateReachability(const std::lock_guard<std::mutex>& graph_lock, bool force = false); + + const std::unordered_map<Connection*, std::unordered_set<Processor*>>& reachable_processors() const; + + static bool partOfCycle(Connection* conn); + + // an outgoing connection allows us to reach these nodes + std::unordered_map<Connection*, std::unordered_set<Processor*>> reachable_processors_; + + std::string process_group_uuid_; + + protected: + std::unique_ptr<ProcessorApi> impl_; }; } // namespace core diff --git a/libminifi/include/core/ProcessorProxy.h b/libminifi/include/core/ProcessorProxy.h deleted file mode 100644 index 0ad48bb3e..000000000 --- a/libminifi/include/core/ProcessorProxy.h +++ /dev/null @@ -1,281 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#pragma once - -#include <algorithm> -#include <atomic> -#include <chrono> -#include <condition_variable> -#include <functional> -#include <memory> -#include <mutex> -#include <string> -#include <string_view> -#include <unordered_set> -#include <unordered_map> -#include <utility> -#include <vector> - -#include "core/ConfigurableComponentImpl.h" -#include "core/Connectable.h" -#include "core/Property.h" -#include "core/Core.h" -#include "minifi-cpp/core/Annotation.h" -#include "minifi-cpp/core/DynamicProperty.h" -#include "minifi-cpp/core/Scheduling.h" -#include "minifi-cpp/core/state/nodes/MetricsBase.h" -#include "minifi-cpp/core/ProcessorMetrics.h" -#include "utils/gsl.h" -#include "utils/Id.h" -#include "minifi-cpp/core/OutputAttributeDefinition.h" -#include "Processor.h" -#include "minifi-cpp/core/Processor.h" - -namespace org::apache::nifi::minifi { - -class Connection; - -namespace core { - -class ProcessContext; -class ProcessSession; -class ProcessSessionFactory; - -constexpr std::chrono::microseconds MINIMUM_SCHEDULING_PERIOD{30}; - -#define BUILDING_DLL 1 - -class ProcessorProxy : public virtual Processor, public ConnectableImpl, public ConfigurableComponentImpl { - public: - ProcessorProxy(std::string_view name, const utils::Identifier& uuid, std::unique_ptr<ProcessorApi> impl); - explicit ProcessorProxy(std::string_view name, std::unique_ptr<ProcessorApi> impl); - - ProcessorProxy(const ProcessorProxy& parent) = delete; - ProcessorProxy& operator=(const ProcessorProxy& parent) = delete; - - bool isRunning() const override; - - ~ProcessorProxy() override; - - void setScheduledState(ScheduledState state) override; - - ScheduledState getScheduledState() const override { - return state_; - } - - void setSchedulingStrategy(SchedulingStrategy strategy) override { - strategy_ = strategy; - } - - SchedulingStrategy getSchedulingStrategy() const override { - return strategy_; - } - - void setSchedulingPeriod(std::chrono::steady_clock::duration period) override { - scheduling_period_ = std::max(std::chrono::steady_clock::duration(MINIMUM_SCHEDULING_PERIOD), period); - } - - std::chrono::steady_clock::duration getSchedulingPeriod() const override { - return scheduling_period_; - } - - void setCronPeriod(const std::string &period) override { - cron_period_ = period; - } - - std::string getCronPeriod() const override { - return cron_period_; - } - - void setRunDurationNano(std::chrono::steady_clock::duration period) override { - run_duration_ = period; - } - - std::chrono::steady_clock::duration getRunDurationNano() const override { - return (run_duration_); - } - - void setYieldPeriodMsec(std::chrono::milliseconds period) override { - yield_period_ = period; - } - - std::chrono::steady_clock::duration getYieldPeriod() const override { - return yield_period_; - } - - void setPenalizationPeriod(std::chrono::milliseconds period) override { - penalization_period_ = period; - } - - void setMaxConcurrentTasks(uint8_t tasks) override; - - bool isSingleThreaded() const override { - return impl_->isSingleThreaded(); - } - - std::string getProcessorType() const override { - return impl_->getProcessorType(); - } - - bool getTriggerWhenEmpty() const override { - return impl_->getTriggerWhenEmpty(); - } - - uint8_t getActiveTasks() const override { - return (active_tasks_); - } - - void incrementActiveTasks() override { - ++active_tasks_; - } - - void decrementActiveTask() override { - if (active_tasks_ > 0) - --active_tasks_; - } - - void clearActiveTask() override { - active_tasks_ = 0; - } - - std::string getProcessGroupUUIDStr() const override { - return process_group_uuid_; - } - - void setProcessGroupUUIDStr(const std::string &uuid) override { - process_group_uuid_ = uuid; - } - - void yield() override; - - void yield(std::chrono::steady_clock::duration delta_time) override; - - bool isYield() override; - - void clearYield() override; - - std::chrono::steady_clock::time_point getYieldExpirationTime() const override { return yield_expiration_; } - std::chrono::steady_clock::duration getYieldTime() const override; - - bool addConnection(Connectable* connection) override; - - bool canEdit() override { - return !isRunning(); - } - - void initialize() override; - - void triggerAndCommit(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSessionFactory>& session_factory) override; - void trigger(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSession>& process_session) override; - - void onTrigger(ProcessContext& context, ProcessSession& session) override { - impl_->onTrigger(context, session); - } - - void onSchedule(ProcessContext& context, ProcessSessionFactory& session_factory) override { - impl_->onSchedule(context, session_factory); - } - - // Hook executed when onSchedule fails (throws). Configuration should be reset in this - void onUnSchedule() override { - impl_->onUnSchedule(); - } - - // Check all incoming connections for work - bool isWorkAvailable() override; - - bool isThrottledByBackpressure() const override; - - Connectable* pickIncomingConnection() override; - - void validateAnnotations() const override; - - annotation::Input getInputRequirement() const override { - return impl_->getInputRequirement(); - } - - [[nodiscard]] bool supportsDynamicProperties() const override { - return impl_->supportsDynamicProperties(); - } - - [[nodiscard]] bool supportsDynamicRelationships() const override { - return impl_->supportsDynamicRelationships(); - } - - state::response::SharedResponseNode getResponseNode() override { - return getMetrics(); - } - - gsl::not_null<std::shared_ptr<ProcessorMetrics>> getMetrics() const override { - return impl_->getMetrics(); - } - - ProcessorApi& getImpl() const override { - gsl_Assert(impl_); - return *impl_; - } - - void restore(const std::shared_ptr<FlowFile>& file) override { - impl_->restore(file); - } - - static constexpr auto DynamicProperties = std::array<DynamicProperty, 0>{}; - - static constexpr auto OutputAttributes = std::array<OutputAttributeReference, 0>{}; - - protected: - std::atomic<ScheduledState> state_; - - std::atomic<std::chrono::steady_clock::duration> scheduling_period_; - std::atomic<std::chrono::steady_clock::duration> run_duration_; - std::atomic<std::chrono::steady_clock::duration> yield_period_; - - std::atomic<uint8_t> active_tasks_; - - std::string cron_period_; - - std::shared_ptr<logging::Logger> logger_; - - private: - mutable std::mutex mutex_; - std::atomic<std::chrono::steady_clock::time_point> yield_expiration_{}; - - static std::mutex& getGraphMutex() { - static std::mutex mutex{}; - return mutex; - } - - // must hold the graphMutex - void updateReachability(const std::lock_guard<std::mutex>& graph_lock, bool force = false) override; - - const std::unordered_map<Connection*, std::unordered_set<Processor*>>& reachable_processors() const override { - return reachable_processors_; - } - - static bool partOfCycle(Connection* conn); - - // an outgoing connection allows us to reach these nodes - std::unordered_map<Connection*, std::unordered_set<Processor*>> reachable_processors_; - - std::string process_group_uuid_; - - protected: - std::unique_ptr<ProcessorApi> impl_; -}; - -} // namespace core -} // namespace org::apache::nifi::minifi diff --git a/libminifi/src/core/ClassLoader.cpp b/libminifi/src/core/ClassLoader.cpp index 9d56c36e5..aa4be367c 100644 --- a/libminifi/src/core/ClassLoader.cpp +++ b/libminifi/src/core/ClassLoader.cpp @@ -23,7 +23,7 @@ #include "core/logging/LoggerFactory.h" #include "range/v3/action/sort.hpp" #include "range/v3/action/unique.hpp" -#include "core/ProcessorProxy.h" +#include "core/Processor.h" namespace org { namespace apache { @@ -120,7 +120,7 @@ class ProcessorFactoryWrapper : public ObjectFactoryImpl { CoreComponent* createRaw(const std::string &name, const utils::Identifier &uuid) override { auto logger = logging::LoggerFactoryBase::getAliasedLogger(getClassName(), uuid); - return new ProcessorProxy(name, uuid, factory_->create({.uuid = uuid, .name = name, .logger = logger})); + return new Processor(name, uuid, factory_->create({.uuid = uuid, .name = name, .logger = logger})); } std::string getGroupName() const override { diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp index 8c118a635..2e1659dc8 100644 --- a/libminifi/src/core/FlowConfiguration.cpp +++ b/libminifi/src/core/FlowConfiguration.cpp @@ -26,7 +26,6 @@ #include "processors/ProcessorUtils.h" #include "utils/StringUtils.h" #include "utils/file/FileUtils.h" -#include "core/ProcessorProxy.h" namespace org::apache::nifi::minifi::core { diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp index c195b3191..a70867ce7 100644 --- a/libminifi/src/core/Processor.cpp +++ b/libminifi/src/core/Processor.cpp @@ -38,14 +38,21 @@ #include "range/v3/algorithm/any_of.hpp" #include "fmt/format.h" #include "Exception.h" -#include "core/ProcessorProxy.h" +#include "core/Processor.h" #include "core/ProcessorMetrics.h" using namespace std::literals::chrono_literals; namespace org::apache::nifi::minifi::core { -ProcessorProxy::ProcessorProxy(std::string_view name, std::unique_ptr<ProcessorApi> impl) +constexpr std::chrono::microseconds MINIMUM_SCHEDULING_PERIOD{30}; + +static std::mutex& getGraphMutex() { + static std::mutex mutex{}; + return mutex; +} + +Processor::Processor(std::string_view name, std::unique_ptr<ProcessorApi> impl) : ConnectableImpl(name), state_(DISABLED), scheduling_period_(MINIMUM_SCHEDULING_PERIOD), @@ -63,7 +70,7 @@ ProcessorProxy::ProcessorProxy(std::string_view name, std::unique_ptr<ProcessorA logger_->log_debug("Processor {} created UUID {}", name_, getUUIDStr()); } -ProcessorProxy::ProcessorProxy(std::string_view name, const utils::Identifier& uuid, std::unique_ptr<ProcessorApi> impl) +Processor::Processor(std::string_view name, const utils::Identifier& uuid, std::unique_ptr<ProcessorApi> impl) : ConnectableImpl(name, uuid), state_(DISABLED), scheduling_period_(MINIMUM_SCHEDULING_PERIOD), @@ -81,22 +88,22 @@ ProcessorProxy::ProcessorProxy(std::string_view name, const utils::Identifier& u logger_->log_debug("Processor {} created with uuid {}", name_, getUUIDStr()); } -ProcessorProxy::~ProcessorProxy() { +Processor::~Processor() { logger_->log_debug("Destroying processor {} with uuid {}", name_, getUUIDStr()); } -bool ProcessorProxy::isRunning() const { +bool Processor::isRunning() const { return (state_ == RUNNING && active_tasks_ > 0); } -void ProcessorProxy::setScheduledState(ScheduledState state) { +void Processor::setScheduledState(ScheduledState state) { state_ = state; if (state == STOPPED) { impl_->notifyStop(); } } -bool ProcessorProxy::addConnection(Connectable* conn) { +bool Processor::addConnection(Connectable* conn) { enum class SetAs{ NONE, OUTPUT, @@ -166,7 +173,7 @@ bool ProcessorProxy::addConnection(Connectable* conn) { return result != SetAs::NONE; } -void ProcessorProxy::triggerAndCommit(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSessionFactory>& session_factory) { +void Processor::triggerAndCommit(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSessionFactory>& session_factory) { const auto process_session = session_factory->createSession(); process_session->setMetrics(getMetrics()); try { @@ -184,14 +191,14 @@ void ProcessorProxy::triggerAndCommit(const std::shared_ptr<ProcessContext>& con } } -void ProcessorProxy::trigger(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSession>& process_session) { +void Processor::trigger(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSession>& process_session) { ++impl_->getMetrics()->invocations(); const auto start = std::chrono::steady_clock::now(); onTrigger(*context, *process_session); impl_->getMetrics()->addLastOnTriggerRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start)); } -bool ProcessorProxy::isWorkAvailable() { +bool Processor::isWorkAvailable() { // We have work if any incoming connection has work std::lock_guard<std::mutex> lock(mutex_); bool hasWork = false; @@ -217,7 +224,7 @@ bool ProcessorProxy::isWorkAvailable() { } // must hold the graphMutex -void ProcessorProxy::updateReachability(const std::lock_guard<std::mutex>& graph_lock, bool force) { +void Processor::updateReachability(const std::lock_guard<std::mutex>& graph_lock, bool force) { bool didChange = force; for (auto& outIt : outgoing_connections_) { for (auto& outConn : outIt.second) { @@ -257,7 +264,7 @@ void ProcessorProxy::updateReachability(const std::lock_guard<std::mutex>& graph } } -bool ProcessorProxy::partOfCycle(Connection* conn) { +bool Processor::partOfCycle(Connection* conn) { auto source = dynamic_cast<Processor*>(conn->getSource()); if (!source) { return false; @@ -269,7 +276,7 @@ bool ProcessorProxy::partOfCycle(Connection* conn) { return it->second.contains(source); } -bool ProcessorProxy::isThrottledByBackpressure() const { +bool Processor::isThrottledByBackpressure() const { bool isThrottledByOutgoing = ranges::any_of(outgoing_connections_, [](auto& name_connection_set_pair) { return ranges::any_of(name_connection_set_pair.second, [](auto& connectable) { auto connection = dynamic_cast<Connection*>(connectable); @@ -283,7 +290,7 @@ bool ProcessorProxy::isThrottledByBackpressure() const { return isThrottledByOutgoing && !isForcedByIncomingCycle; } -Connectable* ProcessorProxy::pickIncomingConnection() { +Connectable* Processor::pickIncomingConnection() { std::lock_guard<std::mutex> rel_guard(relationship_mutex_); auto beginIt = incoming_connections_Iter; @@ -303,7 +310,7 @@ Connectable* ProcessorProxy::pickIncomingConnection() { return getNextIncomingConnectionImpl(rel_guard); } -void ProcessorProxy::validateAnnotations() const { +void Processor::validateAnnotations() const { switch (getInputRequirement()) { case annotation::Input::INPUT_REQUIRED: { if (!hasIncomingConnections()) { @@ -323,7 +330,7 @@ void ProcessorProxy::validateAnnotations() const { } } -void ProcessorProxy::setMaxConcurrentTasks(const uint8_t tasks) { +void Processor::setMaxConcurrentTasks(const uint8_t tasks) { if (isSingleThreaded() && tasks > 1) { logger_->log_warn("Processor {} can not be run in parallel, its \"max concurrent tasks\" value is too high. " "It was set to 1 from {}.", name_, tasks); @@ -334,23 +341,23 @@ void ProcessorProxy::setMaxConcurrentTasks(const uint8_t tasks) { max_concurrent_tasks_ = tasks; } -void ProcessorProxy::yield() { +void Processor::yield() { yield_expiration_ = std::chrono::steady_clock::now() + yield_period_.load(); } -void ProcessorProxy::yield(std::chrono::steady_clock::duration delta_time) { +void Processor::yield(std::chrono::steady_clock::duration delta_time) { yield_expiration_ = std::chrono::steady_clock::now() + delta_time; } -bool ProcessorProxy::isYield() { +bool Processor::isYield() { return getYieldTime() > 0ms; } -void ProcessorProxy::clearYield() { +void Processor::clearYield() { yield_expiration_ = std::chrono::steady_clock::time_point(); } -std::chrono::steady_clock::duration ProcessorProxy::getYieldTime() const { +std::chrono::steady_clock::duration Processor::getYieldTime() const { return std::max(yield_expiration_.load()-std::chrono::steady_clock::now(), std::chrono::steady_clock::duration{0}); } @@ -358,7 +365,7 @@ namespace { class ProcessorDescriptorImpl : public ProcessorDescriptor { public: - explicit ProcessorDescriptorImpl(ProcessorProxy* impl): impl_(impl) {} + explicit ProcessorDescriptorImpl(Processor* impl): impl_(impl) {} void setSupportedRelationships(std::span<const RelationshipDefinition> relationships) override { impl_->setSupportedRelationships(relationships); } @@ -368,14 +375,148 @@ class ProcessorDescriptorImpl : public ProcessorDescriptor { } private: - ProcessorProxy* impl_; + Processor* impl_; }; } // namespace -void ProcessorProxy::initialize() { +void Processor::initialize() { ProcessorDescriptorImpl self{this}; impl_->initialize(self); } +ScheduledState Processor::getScheduledState() const { + return state_; +} + +void Processor::setSchedulingStrategy(SchedulingStrategy strategy) { + strategy_ = strategy; +} + +SchedulingStrategy Processor::getSchedulingStrategy() const { + return strategy_; +} + +void Processor::setSchedulingPeriod(std::chrono::steady_clock::duration period) { + scheduling_period_ = std::max(std::chrono::steady_clock::duration(MINIMUM_SCHEDULING_PERIOD), period); +} + +std::chrono::steady_clock::duration Processor::getSchedulingPeriod() const { + return scheduling_period_; +} + +void Processor::setCronPeriod(const std::string &period) { + cron_period_ = period; +} + +std::string Processor::getCronPeriod() const { + return cron_period_; +} + +void Processor::setRunDurationNano(std::chrono::steady_clock::duration period) { + run_duration_ = period; +} + +std::chrono::steady_clock::duration Processor::getRunDurationNano() const { + return (run_duration_); +} + +void Processor::setYieldPeriodMsec(std::chrono::milliseconds period) { + yield_period_ = period; +} + +std::chrono::steady_clock::duration Processor::getYieldPeriod() const { + return yield_period_; +} + +void Processor::setPenalizationPeriod(std::chrono::milliseconds period) { + penalization_period_ = period; +} + +bool Processor::isSingleThreaded() const { + return impl_->isSingleThreaded(); +} + +std::string Processor::getProcessorType() const { + return impl_->getProcessorType(); +} + +bool Processor::getTriggerWhenEmpty() const { + return impl_->getTriggerWhenEmpty(); +} + +uint8_t Processor::getActiveTasks() const { + return (active_tasks_); +} + +void Processor::incrementActiveTasks() { + ++active_tasks_; +} + +void Processor::decrementActiveTask() { + if (active_tasks_ > 0) + --active_tasks_; +} + +void Processor::clearActiveTask() { + active_tasks_ = 0; +} + +std::string Processor::getProcessGroupUUIDStr() const { + return process_group_uuid_; +} + +void Processor::setProcessGroupUUIDStr(const std::string &uuid) { + process_group_uuid_ = uuid; +} + +std::chrono::steady_clock::time_point Processor::getYieldExpirationTime() const { + return yield_expiration_; +} + +bool Processor::canEdit() { + return !isRunning(); +} + +void Processor::onTrigger(ProcessContext& context, ProcessSession& session) { + impl_->onTrigger(context, session); +} + +void Processor::onSchedule(ProcessContext& context, ProcessSessionFactory& session_factory) { + impl_->onSchedule(context, session_factory); +} + +// Hook executed when onSchedule fails (throws). Configuration should be reset in this +void Processor::onUnSchedule() { + impl_->onUnSchedule(); +} + +annotation::Input Processor::getInputRequirement() const { + return impl_->getInputRequirement(); +} + +[[nodiscard]] bool Processor::supportsDynamicProperties() const { + return impl_->supportsDynamicProperties(); +} + +[[nodiscard]] bool Processor::supportsDynamicRelationships() const { + return impl_->supportsDynamicRelationships(); +} + +state::response::SharedResponseNode Processor::getResponseNode() { + return getMetrics(); +} + +gsl::not_null<std::shared_ptr<ProcessorMetrics>> Processor::getMetrics() const { + return impl_->getMetrics(); +} + +void Processor::restore(const std::shared_ptr<FlowFile>& file) { + impl_->restore(file); +} + +const std::unordered_map<Connection*, std::unordered_set<Processor*>>& Processor::reachable_processors() const { + return reachable_processors_; +} + } // namespace org::apache::nifi::minifi::core diff --git a/libminifi/src/core/flow/StructuredConfiguration.cpp b/libminifi/src/core/flow/StructuredConfiguration.cpp index 61f8d8582..667aa3205 100644 --- a/libminifi/src/core/flow/StructuredConfiguration.cpp +++ b/libminifi/src/core/flow/StructuredConfiguration.cpp @@ -24,7 +24,7 @@ #include "Funnel.h" #include "core/ParameterContext.h" -#include "core/ProcessorProxy.h" +#include "core/Processor.h" #include "core/ParameterTokenParser.h" #include "core/ReferenceParser.h" #include "core/flow/CheckRequiredField.h" @@ -584,7 +584,7 @@ void StructuredConfiguration::parseProvenanceReporting(const Node& node, core::P auto report_task_impl = createProvenanceReportTask(); auto* reportTask = report_task_impl.get(); - auto report_task_wrapper = std::make_unique<core::ProcessorProxy>("", std::move(report_task_impl)); + auto report_task_wrapper = std::make_unique<core::Processor>("", std::move(report_task_impl)); checkRequiredField(node, schema_.scheduling_strategy); auto schedulingStrategyStr = node[schema_.scheduling_strategy].getString().value(); @@ -747,7 +747,7 @@ void StructuredConfiguration::parseRPGPort(const Node& port_node, core::ProcessG auto port_impl = std::make_unique<minifi::RemoteProcessorGroupPort>( nameStr, parent->getURL(), this->configuration_, uuid); auto* port = port_impl.get(); - auto port_wrapper = std::make_unique<core::ProcessorProxy>(nameStr, uuid, std::move(port_impl)); + auto port_wrapper = std::make_unique<core::Processor>(nameStr, uuid, std::move(port_impl)); port->setDirection(direction); port->setTimeout(parent->getTimeout()); port->setTransmitting(true); @@ -952,7 +952,7 @@ void StructuredConfiguration::parseFunnels(const Node& node, core::ProcessGroup* throw Exception(ExceptionType::GENERAL_EXCEPTION, "Incorrect funnel UUID format."); }); - auto funnel = std::make_unique<core::ProcessorProxy>(name, uuid.value(), std::make_unique<minifi::Funnel>(name, uuid.value())); + auto funnel = std::make_unique<core::Processor>(name, uuid.value(), std::make_unique<minifi::Funnel>(name, uuid.value())); logger_->log_debug("Created funnel with UUID {} and name {}", id, name); funnel->setScheduledState(core::RUNNING); funnel->setSchedulingStrategy(core::EVENT_DRIVEN); @@ -980,7 +980,7 @@ void StructuredConfiguration::parsePorts(const flow::Node& node, core::ProcessGr throw Exception(ExceptionType::GENERAL_EXCEPTION, "Incorrect port UUID format."); }); - auto port = std::make_unique<PortProxy>(name, uuid.value(), std::make_unique<PortImpl>(name, uuid.value(), port_type)); + auto port = std::make_unique<Port>(name, uuid.value(), std::make_unique<PortImpl>(name, uuid.value(), port_type)); logger_->log_debug("Created port UUID {} and name {}", id, name); port->setScheduledState(core::RUNNING); port->setSchedulingStrategy(core::EVENT_DRIVEN); diff --git a/libminifi/test/libtest/unit/TestUtils.h b/libminifi/test/libtest/unit/TestUtils.h index 2c2ef7bcb..ff3b435a3 100644 --- a/libminifi/test/libtest/unit/TestUtils.h +++ b/libminifi/test/libtest/unit/TestUtils.h @@ -39,7 +39,7 @@ #include "asio/ssl.hpp" #include "utils/net/Ssl.h" #include "range/v3/algorithm/any_of.hpp" -#include "core/ProcessorProxy.h" +#include "core/Processor.h" #include "core/logging/LoggerFactory.h" using namespace std::literals::chrono_literals; @@ -243,7 +243,7 @@ std::unique_ptr<core::Processor> make_processor(std::string_view name, std::opti .name = std::string{name}, .logger = minifi::core::logging::LoggerFactory<T>::getLogger(uuid.value()) }); - return std::make_unique<core::ProcessorProxy>(name, uuid.value(), std::move(processor_impl)); + return std::make_unique<core::Processor>(name, uuid.value(), std::move(processor_impl)); } template<typename T, typename ...Args> @@ -251,7 +251,7 @@ std::unique_ptr<core::Processor> make_custom_processor(Args&&... args) { auto processor_impl = std::make_unique<T>(std::forward<Args>(args)...); auto name = processor_impl->getName(); auto uuid = processor_impl->getUUID(); - return std::make_unique<core::ProcessorProxy>(name, uuid, std::move(processor_impl)); + return std::make_unique<core::Processor>(name, uuid, std::move(processor_impl)); } } // namespace org::apache::nifi::minifi::test::utils diff --git a/minifi-api/include/minifi-cpp/core/ProcessContext.h b/minifi-api/include/minifi-cpp/core/ProcessContext.h index d925d9f55..629d06c28 100644 --- a/minifi-api/include/minifi-cpp/core/ProcessContext.h +++ b/minifi-api/include/minifi-cpp/core/ProcessContext.h @@ -24,7 +24,6 @@ #include "minifi-cpp/core/Core.h" #include "minifi-cpp/core/ContentRepository.h" #include "minifi-cpp/core/controller/ControllerServiceLookup.h" -#include "minifi-cpp/core/Processor.h" #include "minifi-cpp/core/Property.h" #include "minifi-cpp/core/Repository.h" #include "minifi-cpp/core/FlowFile.h" diff --git a/minifi-api/include/minifi-cpp/core/ProcessContextBuilder.h b/minifi-api/include/minifi-cpp/core/ProcessContextBuilder.h index 18d8ec27f..7607dfee2 100644 --- a/minifi-api/include/minifi-cpp/core/ProcessContextBuilder.h +++ b/minifi-api/include/minifi-cpp/core/ProcessContextBuilder.h @@ -22,7 +22,6 @@ #include "minifi-cpp/properties/Configure.h" #include "minifi-cpp/core/controller/ControllerServiceProvider.h" #include "ProcessContext.h" -#include "Processor.h" #include "minifi-cpp/core/Repository.h" namespace org::apache::nifi::minifi::core { diff --git a/minifi-api/include/minifi-cpp/core/Processor.h b/minifi-api/include/minifi-cpp/core/ProcessorApi.h similarity index 100% rename from minifi-api/include/minifi-cpp/core/Processor.h rename to minifi-api/include/minifi-cpp/core/ProcessorApi.h diff --git a/minifi-api/include/minifi-cpp/core/ProcessorFactory.h b/minifi-api/include/minifi-cpp/core/ProcessorFactory.h index 7b453170f..ed27ebf2e 100644 --- a/minifi-api/include/minifi-cpp/core/ProcessorFactory.h +++ b/minifi-api/include/minifi-cpp/core/ProcessorFactory.h @@ -20,11 +20,12 @@ #include <string> #include <memory> #include <utility> -#include "minifi-cpp/core/Processor.h" #include "minifi-cpp/core/ProcessorMetadata.h" namespace org::apache::nifi::minifi::core { +class ProcessorApi; + class ProcessorFactory { public: virtual std::unique_ptr<ProcessorApi> create(ProcessorMetadata info) = 0; diff --git a/utils/include/core/Processor.h b/utils/include/core/Processor.h deleted file mode 100644 index 3792098b7..000000000 --- a/utils/include/core/Processor.h +++ /dev/null @@ -1,19 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#pragma once - -#include "core/ProcessorImpl.h" diff --git a/utils/include/core/ProcessorImpl.h b/utils/include/core/ProcessorImpl.h index 225e2b243..23ba2f516 100644 --- a/utils/include/core/ProcessorImpl.h +++ b/utils/include/core/ProcessorImpl.h @@ -42,7 +42,7 @@ #include "utils/gsl.h" #include "utils/Id.h" #include "minifi-cpp/core/OutputAttributeDefinition.h" -#include "minifi-cpp/core/Processor.h" +#include "minifi-cpp/core/ProcessorApi.h" #include "utils/PropertyErrors.h" #include "minifi-cpp/core/ProcessorMetadata.h" #include "Exception.h" diff --git a/utils/src/core/Processor.cpp b/utils/src/core/ProcessorImpl.cpp similarity index 98% rename from utils/src/core/Processor.cpp rename to utils/src/core/ProcessorImpl.cpp index 0caaf9c67..460ba47ff 100644 --- a/utils/src/core/Processor.cpp +++ b/utils/src/core/ProcessorImpl.cpp @@ -17,7 +17,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include <ctime> #include <cctype> diff --git a/utils/src/core/ProcessorMetrics.cpp b/utils/src/core/ProcessorMetrics.cpp index 5d3e7be33..9952f74b4 100644 --- a/utils/src/core/ProcessorMetrics.cpp +++ b/utils/src/core/ProcessorMetrics.cpp @@ -16,7 +16,7 @@ */ #include "core/ProcessorMetrics.h" -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "core/state/Value.h" #include "utils/gsl.h" #include "range/v3/numeric/accumulate.hpp"
