This is an automated email from the ASF dual-hosted git repository.
adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new f999498 MINIFICPP-1566 - Annotate maximum allowed threads for
processors
f999498 is described below
commit f999498a11cb449b4185b7b4320896409cde4050
Author: Adam Markovics <[email protected]>
AuthorDate: Fri Oct 22 13:52:38 2021 +0200
MINIFICPP-1566 - Annotate maximum allowed threads for processors
Signed-off-by: Adam Debreceni <[email protected]>
This closes #1191
---
extensions/aws/processors/DeleteS3Object.h | 4 ++
extensions/aws/processors/FetchS3Object.h | 4 ++
extensions/aws/processors/PutS3Object.h | 4 ++
.../azure/processors/AzureStorageProcessorBase.h | 1 -
.../azure/processors/PutAzureBlobStorage.cpp | 23 ++++-------
extensions/azure/processors/PutAzureBlobStorage.h | 8 ++++
.../azure/processors/PutAzureDataLakeStorage.cpp | 11 ++---
.../azure/processors/PutAzureDataLakeStorage.h | 4 ++
extensions/opc/include/fetchopc.h | 2 -
extensions/opc/include/opcbase.h | 4 ++
extensions/opc/include/putopc.h | 3 --
extensions/opc/src/fetchopc.cpp | 7 ----
extensions/opc/src/putopc.cpp | 7 ----
.../SourceInitiatedSubscriptionListener.h | 4 ++
extensions/pdh/PerformanceDataMonitor.h | 8 ++++
extensions/script/python/PythonCreator.h | 1 +
extensions/sftp/processors/ListSFTP.h | 5 ++-
extensions/sql/processors/SQLProcessor.cpp | 2 -
extensions/sql/processors/SQLProcessor.h | 12 +++---
.../standard-processors/processors/TailFile.cpp | 5 ---
.../standard-processors/processors/TailFile.h | 5 ++-
.../tests/unit/ManifestTests.cpp | 18 +++++----
.../tests/unit/ProcessorTests.cpp | 47 ++++++++++++++++++++++
extensions/systemd/ConsumeJournald.h | 4 ++
.../windows-event-log/ConsumeWindowsEventLog.cpp | 13 +++---
.../windows-event-log/ConsumeWindowsEventLog.h | 5 ++-
libminifi/include/agent/build_description.h | 2 +
libminifi/include/core/Connectable.h | 2 +-
libminifi/include/core/Processor.h | 13 +++---
libminifi/include/core/ProcessorNode.h | 2 +-
.../include/core/state/nodes/AgentInformation.h | 5 +++
libminifi/src/core/Processor.cpp | 16 ++++++--
32 files changed, 167 insertions(+), 84 deletions(-)
diff --git a/extensions/aws/processors/DeleteS3Object.h
b/extensions/aws/processors/DeleteS3Object.h
index 5927d0c..e59b9f3 100644
--- a/extensions/aws/processors/DeleteS3Object.h
+++ b/extensions/aws/processors/DeleteS3Object.h
@@ -65,6 +65,10 @@ class DeleteS3Object : public S3Processor {
return core::annotation::Input::INPUT_REQUIRED;
}
+ bool isSingleThreaded() const override {
+ return true;
+ }
+
friend class ::S3TestsFixture<DeleteS3Object>;
explicit DeleteS3Object(const std::string& name, const
minifi::utils::Identifier& uuid, std::unique_ptr<aws::s3::S3RequestSender>
s3_request_sender)
diff --git a/extensions/aws/processors/FetchS3Object.h
b/extensions/aws/processors/FetchS3Object.h
index 366a89a..14b1e98 100644
--- a/extensions/aws/processors/FetchS3Object.h
+++ b/extensions/aws/processors/FetchS3Object.h
@@ -94,6 +94,10 @@ class FetchS3Object : public S3Processor {
return core::annotation::Input::INPUT_REQUIRED;
}
+ bool isSingleThreaded() const override {
+ return true;
+ }
+
friend class ::S3TestsFixture<FetchS3Object>;
explicit FetchS3Object(const std::string& name, const
minifi::utils::Identifier& uuid, std::unique_ptr<aws::s3::S3RequestSender>
s3_request_sender)
diff --git a/extensions/aws/processors/PutS3Object.h
b/extensions/aws/processors/PutS3Object.h
index 09f3f1e..b434bef 100644
--- a/extensions/aws/processors/PutS3Object.h
+++ b/extensions/aws/processors/PutS3Object.h
@@ -124,6 +124,10 @@ class PutS3Object : public S3Processor {
return core::annotation::Input::INPUT_REQUIRED;
}
+ bool isSingleThreaded() const override {
+ return true;
+ }
+
friend class ::S3TestsFixture<PutS3Object>;
explicit PutS3Object(const std::string& name, const
minifi::utils::Identifier& uuid, std::unique_ptr<aws::s3::S3RequestSender>
s3_request_sender)
diff --git a/extensions/azure/processors/AzureStorageProcessorBase.h
b/extensions/azure/processors/AzureStorageProcessorBase.h
index 880f822..6017b85 100644
--- a/extensions/azure/processors/AzureStorageProcessorBase.h
+++ b/extensions/azure/processors/AzureStorageProcessorBase.h
@@ -51,7 +51,6 @@ class AzureStorageProcessorBase : public core::Processor {
std::tuple<GetCredentialsFromControllerResult,
std::optional<storage::AzureStorageCredentials>>
getCredentialsFromControllerService(const std::shared_ptr<core::ProcessContext>
&context) const;
- std::mutex azure_storage_mutex_;
std::shared_ptr<logging::Logger> logger_;
};
diff --git a/extensions/azure/processors/PutAzureBlobStorage.cpp
b/extensions/azure/processors/PutAzureBlobStorage.cpp
index 194b76c..e5f66fd 100644
--- a/extensions/azure/processors/PutAzureBlobStorage.cpp
+++ b/extensions/azure/processors/PutAzureBlobStorage.cpp
@@ -240,23 +240,16 @@ void PutAzureBlobStorage::onTrigger(const
std::shared_ptr<core::ProcessContext>
return;
}
- std::optional<storage::UploadBlobResult> upload_result;
- {
- // TODO(lordgamez): This can be removed after maximum allowed threads are
implemented. See https://issues.apache.org/jira/browse/MINIFICPP-1566
- // When used in multithreaded environment make sure to use the
azure_storage_mutex_ to lock the wrapper so the
- // client is not reset with different configuration while another thread
is using it.
- std::lock_guard<std::mutex> lock(azure_storage_mutex_);
- if (create_container_) {
- auto result = azure_blob_storage_.createContainerIfNotExists(*params);
- if (!result) {
- session->transfer(flow_file, Failure);
- return;
- }
+ if (create_container_) {
+ auto result = azure_blob_storage_.createContainerIfNotExists(*params);
+ if (!result) {
+ session->transfer(flow_file, Failure);
+ return;
}
- PutAzureBlobStorage::ReadCallback callback(flow_file->getSize(),
azure_blob_storage_, *params);
- session->read(flow_file, &callback);
- upload_result = callback.getResult();
}
+ PutAzureBlobStorage::ReadCallback callback(flow_file->getSize(),
azure_blob_storage_, *params);
+ session->read(flow_file, &callback);
+ const std::optional<storage::UploadBlobResult> upload_result =
callback.getResult();
if (!upload_result) {
logger_->log_error("Failed to upload blob '%s' to Azure Storage container
'%s'", params->blob_name, params->container_name);
diff --git a/extensions/azure/processors/PutAzureBlobStorage.h
b/extensions/azure/processors/PutAzureBlobStorage.h
index bfd71fd..6de322f 100644
--- a/extensions/azure/processors/PutAzureBlobStorage.h
+++ b/extensions/azure/processors/PutAzureBlobStorage.h
@@ -100,6 +100,14 @@ class PutAzureBlobStorage final : public
AzureStorageProcessorBase {
, azure_blob_storage_(std::move(blob_storage_client)) {
}
+ core::annotation::Input getInputRequirement() const override {
+ return core::annotation::Input::INPUT_REQUIRED;
+ }
+
+ bool isSingleThreaded() const override {
+ return true;
+ }
+
storage::AzureStorageCredentials getAzureCredentialsFromProperties(
const std::shared_ptr<core::ProcessContext> &context,
const std::shared_ptr<core::FlowFile> &flow_file) const;
diff --git a/extensions/azure/processors/PutAzureDataLakeStorage.cpp
b/extensions/azure/processors/PutAzureDataLakeStorage.cpp
index c07c3b6..d48998e 100644
--- a/extensions/azure/processors/PutAzureDataLakeStorage.cpp
+++ b/extensions/azure/processors/PutAzureDataLakeStorage.cpp
@@ -124,14 +124,9 @@ void PutAzureDataLakeStorage::onTrigger(const
std::shared_ptr<core::ProcessConte
return;
}
- storage::UploadDataLakeStorageResult result;
- {
- // TODO(lordgamez): This can be removed after maximum allowed threads are
implemented. See https://issues.apache.org/jira/browse/MINIFICPP-1566
- std::lock_guard<std::mutex> lock(azure_storage_mutex_);
- PutAzureDataLakeStorage::ReadCallback callback(flow_file->getSize(),
azure_data_lake_storage_, *params, logger_);
- session->read(flow_file, &callback);
- result = callback.getResult();
- }
+ PutAzureDataLakeStorage::ReadCallback callback(flow_file->getSize(),
azure_data_lake_storage_, *params, logger_);
+ session->read(flow_file, &callback);
+ const storage::UploadDataLakeStorageResult result = callback.getResult();
if (result.result_code == storage::UploadResultCode::FILE_ALREADY_EXISTS) {
gsl_Expects(conflict_resolution_strategy_ !=
FileExistsResolutionStrategy::REPLACE_FILE);
diff --git a/extensions/azure/processors/PutAzureDataLakeStorage.h
b/extensions/azure/processors/PutAzureDataLakeStorage.h
index d842cdc..c924cd6 100644
--- a/extensions/azure/processors/PutAzureDataLakeStorage.h
+++ b/extensions/azure/processors/PutAzureDataLakeStorage.h
@@ -88,6 +88,10 @@ class PutAzureDataLakeStorage final : public
AzureStorageProcessorBase {
return core::annotation::Input::INPUT_REQUIRED;
}
+ bool isSingleThreaded() const override {
+ return true;
+ }
+
explicit PutAzureDataLakeStorage(const std::string& name, const
minifi::utils::Identifier& uuid,
std::unique_ptr<storage::DataLakeStorageClient> data_lake_storage_client)
: AzureStorageProcessorBase(name, uuid,
logging::LoggerFactory<PutAzureDataLakeStorage>::getLogger()),
azure_data_lake_storage_(std::move(data_lake_storage_client)) {
diff --git a/extensions/opc/include/fetchopc.h
b/extensions/opc/include/fetchopc.h
index ed391a3..fb4471f 100644
--- a/extensions/opc/include/fetchopc.h
+++ b/extensions/opc/include/fetchopc.h
@@ -21,7 +21,6 @@
#include <memory>
#include <string>
#include <unordered_map>
-#include <mutex>
#include <vector>
#include "opc.h"
@@ -92,7 +91,6 @@ class FetchOPCProcessor : public BaseOPCProcessor {
bool lazy_mode_;
private:
- std::mutex onTriggerMutex_;
std::vector<UA_NodeId> translatedNodeIDs_; // Only used when user provides
path, path->nodeid translation is only done once
std::unordered_map<std::string, std::string> node_timestamp_; // Key = Full
path, Value = Timestamp
};
diff --git a/extensions/opc/include/opcbase.h b/extensions/opc/include/opcbase.h
index ea4dc46..bebd765 100644
--- a/extensions/opc/include/opcbase.h
+++ b/extensions/opc/include/opcbase.h
@@ -54,6 +54,10 @@ class BaseOPCProcessor : public core::Processor {
void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::ProcessSessionFactory> &factory) override;
protected:
+ bool isSingleThreaded() const override {
+ return true;
+ }
+
virtual bool reconnect();
std::shared_ptr<logging::Logger> logger_;
diff --git a/extensions/opc/include/putopc.h b/extensions/opc/include/putopc.h
index 0497693..8c2180d 100644
--- a/extensions/opc/include/putopc.h
+++ b/extensions/opc/include/putopc.h
@@ -22,7 +22,6 @@
#include <memory>
#include <string>
#include <vector>
-#include <mutex>
#include "opc.h"
#include "opcbase.h"
@@ -81,8 +80,6 @@ class PutOPCProcessor : public BaseOPCProcessor {
std::shared_ptr<logging::Logger> logger_;
};
- std::mutex onTriggerMutex_;
-
std::string nodeID_;
int32_t nameSpaceIdx_;
opc::OPCNodeIDType idType_;
diff --git a/extensions/opc/src/fetchopc.cpp b/extensions/opc/src/fetchopc.cpp
index cfb8c5d..cedade7 100644
--- a/extensions/opc/src/fetchopc.cpp
+++ b/extensions/opc/src/fetchopc.cpp
@@ -21,7 +21,6 @@
#include <string>
#include <list>
#include <map>
-#include <mutex>
#include <thread>
#include "opc.h"
@@ -136,12 +135,6 @@ namespace processors {
void FetchOPCProcessor::onTrigger(const
std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::ProcessSession> &session) {
logger_->log_trace("FetchOPCProcessor::onTrigger");
- std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock);
- if (!lock.owns_lock()) {
- logger_->log_warn("processor was triggered before previous listing
finished, configuration should be revised!");
- return;
- }
-
if (!reconnect()) {
yield();
return;
diff --git a/extensions/opc/src/putopc.cpp b/extensions/opc/src/putopc.cpp
index 5936ce3..8e4368a 100644
--- a/extensions/opc/src/putopc.cpp
+++ b/extensions/opc/src/putopc.cpp
@@ -20,7 +20,6 @@
#include <list>
#include <map>
#include <memory>
-#include <mutex>
#include <string>
#include <optional>
#include <thread>
@@ -157,12 +156,6 @@ namespace processors {
void PutOPCProcessor::onTrigger(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::ProcessSession> &session) {
logger_->log_trace("PutOPCProcessor::onTrigger");
- std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock);
- if (!lock.owns_lock()) {
- logger_->log_warn("processor was triggered before previous listing
finished, configuration should be revised!");
- return;
- }
-
if (!reconnect()) {
yield();
return;
diff --git
a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
index e9f58d1..b00a2a4 100644
--- a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
+++ b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
@@ -113,6 +113,10 @@ class SourceInitiatedSubscriptionListener : public
core::Processor {
};
protected:
+ bool isSingleThreaded() const override {
+ return true;
+ }
+
std::shared_ptr<logging::Logger> logger_;
std::shared_ptr<core::CoreComponentStateManager> state_manager_;
diff --git a/extensions/pdh/PerformanceDataMonitor.h
b/extensions/pdh/PerformanceDataMonitor.h
index 3539741..b93c5cf 100644
--- a/extensions/pdh/PerformanceDataMonitor.h
+++ b/extensions/pdh/PerformanceDataMonitor.h
@@ -73,6 +73,14 @@ class PerformanceDataMonitor : public core::Processor {
JSON, OPENTELEMETRY
};
+ core::annotation::Input getInputRequirement() const override {
+ return core::annotation::Input::INPUT_FORBIDDEN;
+ }
+
+ bool isSingleThreaded() const override {
+ return true;
+ }
+
rapidjson::Value& prepareJSONBody(rapidjson::Document& root);
void setupMembersFromProperties(const std::shared_ptr<core::ProcessContext>&
context);
diff --git a/extensions/script/python/PythonCreator.h
b/extensions/script/python/PythonCreator.h
index 4977fab..24d0abb 100644
--- a/extensions/script/python/PythonCreator.h
+++ b/extensions/script/python/PythonCreator.h
@@ -100,6 +100,7 @@ class PythonCreator : public minifi::core::CoreComponent {
minifi::ClassDescription description(fullname);
description.dynamic_properties_ =
processor->getPythonSupportDynamicProperties();
description.inputRequirement_ = processor->getInputRequirementAsString();
+ description.isSingleThreaded_ = processor->isSingleThreaded();
auto properties = processor->getPythonProperties();
minifi::AgentDocs::putDescription(scriptname, processor->getDescription());
diff --git a/extensions/sftp/processors/ListSFTP.h
b/extensions/sftp/processors/ListSFTP.h
index d981d34..9967b38 100644
--- a/extensions/sftp/processors/ListSFTP.h
+++ b/extensions/sftp/processors/ListSFTP.h
@@ -60,7 +60,7 @@ class ListSFTP : public SFTPProcessorBase {
* Create a new processor
*/
explicit ListSFTP(const std::string& name, const utils::Identifier& uuid =
{});
- virtual ~ListSFTP();
+ ~ListSFTP() override;
// Supported Properties
static core::Property ListingStrategy;
@@ -102,6 +102,9 @@ class ListSFTP : public SFTPProcessorBase {
return core::annotation::Input::INPUT_FORBIDDEN;
}
+ bool isSingleThreaded() const override {
+ return true;
+ }
std::shared_ptr<core::CoreComponentStateManager> state_manager_;
std::string listing_strategy_;
diff --git a/extensions/sql/processors/SQLProcessor.cpp
b/extensions/sql/processors/SQLProcessor.cpp
index 5238a1c..b9197f7 100644
--- a/extensions/sql/processors/SQLProcessor.cpp
+++ b/extensions/sql/processors/SQLProcessor.cpp
@@ -52,8 +52,6 @@ void SQLProcessor::onSchedule(const
std::shared_ptr<core::ProcessContext>& conte
}
void SQLProcessor::onTrigger(const std::shared_ptr<core::ProcessContext>&
context, const std::shared_ptr<core::ProcessSession>& session) {
- std::lock_guard<std::mutex> guard(on_trigger_mutex_);
-
try {
if (!connection_) {
connection_ = db_service_->getConnection();
diff --git a/extensions/sql/processors/SQLProcessor.h
b/extensions/sql/processors/SQLProcessor.h
index 6def80d..8da0de5 100644
--- a/extensions/sql/processors/SQLProcessor.h
+++ b/extensions/sql/processors/SQLProcessor.h
@@ -42,6 +42,10 @@ class SQLProcessor: public core::Processor {
: core::Processor(name, uuid), logger_(std::move(logger)) {
}
+ bool isSingleThreaded() const override {
+ return true;
+ }
+
static std::vector<std::string> collectArguments(const
std::shared_ptr<core::FlowFile>& flow_file);
virtual void processOnSchedule(core::ProcessContext& context) = 0;
@@ -55,11 +59,9 @@ class SQLProcessor: public core::Processor {
connection_.reset();
}
- protected:
- std::shared_ptr<logging::Logger> logger_;
- std::shared_ptr<sql::controllers::DatabaseService> db_service_;
- std::unique_ptr<sql::Connection> connection_;
- std::mutex on_trigger_mutex_;
+ std::shared_ptr<logging::Logger> logger_;
+ std::shared_ptr<sql::controllers::DatabaseService> db_service_;
+ std::unique_ptr<sql::Connection> connection_;
};
} // namespace processors
diff --git a/extensions/standard-processors/processors/TailFile.cpp
b/extensions/standard-processors/processors/TailFile.cpp
index 1dec544..6180a2b 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -33,7 +33,6 @@
#include <regex>
#include "range/v3/action/sort.hpp"
-#include "range/v3/algorithm/transform.hpp"
#include "io/CRCStream.h"
#include "utils/file/FileUtils.h"
@@ -335,8 +334,6 @@ void TailFile::initialize() {
}
void TailFile::onSchedule(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::ProcessSessionFactory>&
/*sessionFactory*/) {
- std::lock_guard<std::mutex> tail_lock(tail_file_mutex_);
-
tail_states_.clear();
state_manager_ = context->getStateManager();
@@ -671,8 +668,6 @@ std::vector<TailState>
TailFile::sortAndSkipMainFilePrefix(const TailState &stat
}
void TailFile::onTrigger(const std::shared_ptr<core::ProcessContext> &, const
std::shared_ptr<core::ProcessSession> &session) {
- std::lock_guard<std::mutex> tail_lock(tail_file_mutex_);
-
if (tail_mode_ == Mode::MULTIPLE) {
if (last_multifile_lookup_ + lookup_frequency_ <
std::chrono::steady_clock::now()) {
logger_->log_debug("Lookup frequency %" PRId64 " ms have elapsed, doing
new multifile lookup", int64_t{lookup_frequency_.count()});
diff --git a/extensions/standard-processors/processors/TailFile.h
b/extensions/standard-processors/processors/TailFile.h
index 4d85505..dc2048c 100644
--- a/extensions/standard-processors/processors/TailFile.h
+++ b/extensions/standard-processors/processors/TailFile.h
@@ -140,6 +140,10 @@ class TailFile : public core::Processor {
return core::annotation::Input::INPUT_FORBIDDEN;
}
+ bool isSingleThreaded() const override {
+ return true;
+ }
+
void parseStateFileLine(char *buf, std::map<std::string, TailState> &state)
const;
void processAllRotatedFiles(const std::shared_ptr<core::ProcessSession>
&session, TailState &state);
void processRotatedFiles(const std::shared_ptr<core::ProcessSession>
&session, TailState &state, std::vector<TailState> &rotated_file_states);
@@ -170,7 +174,6 @@ class TailFile : public core::Processor {
static const char *POSITION_STR;
static const int BUFFER_SIZE = 512;
- std::mutex tail_file_mutex_;
std::string delimiter_; // Delimiter for the data incoming from the tailed
file.
std::shared_ptr<core::CoreComponentStateManager> state_manager_;
std::map<std::string, TailState> tail_states_;
diff --git a/extensions/standard-processors/tests/unit/ManifestTests.cpp
b/extensions/standard-processors/tests/unit/ManifestTests.cpp
index efe4b24..d6dfc47 100644
--- a/extensions/standard-processors/tests/unit/ManifestTests.cpp
+++ b/extensions/standard-processors/tests/unit/ManifestTests.cpp
@@ -17,12 +17,9 @@
*/
#include <memory>
-#include <algorithm>
-#include "core/Processor.h"
#include "core/state/nodes/DeviceInformation.h"
#include "core/state/nodes/AgentInformation.h"
#include "TestBase.h"
-#include "io/ClientSocket.h"
#include "core/ClassLoader.h"
// Include some processor headers to make sure they are part of the manifest
@@ -107,8 +104,17 @@ TEST_CASE("Test Relationships", "[rel1]") {
}
}
#ifndef WIN32
+ const auto& inputRequirement = proc_0.children[1];
+ REQUIRE(inputRequirement.name == "inputRequirement");
+ REQUIRE(inputRequirement.value.to_string() == "INPUT_REQUIRED");
+
+ const auto& isSingleThreaded = proc_0.children[2];
+ REQUIRE(isSingleThreaded.name == "isSingleThreaded");
+ REQUIRE(isSingleThreaded.value.getValue()->getTypeIndex() ==
org::apache::nifi::minifi::state::response::Value::BOOL_TYPE);
+ REQUIRE(isSingleThreaded.value.to_string() == "false");
+
REQUIRE(proc_0.children.size() > 0);
- const auto& relationships = proc_0.children[2];
+ const auto& relationships = proc_0.children[3];
REQUIRE("supportedRelationships" == relationships.name);
// this is because they are now nested
REQUIRE("supportedRelationships" == relationships.children[0].name);
@@ -118,10 +124,6 @@ TEST_CASE("Test Relationships", "[rel1]") {
REQUIRE("success" ==
relationships.children[1].children[0].value.to_string());
REQUIRE("description" == relationships.children[1].children[1].name);
-
- const auto& inputRequirement = proc_0.children[1];
- REQUIRE("inputRequirement" == inputRequirement.name);
- REQUIRE("INPUT_REQUIRED" == inputRequirement.value.to_string());
#endif
}
diff --git a/extensions/standard-processors/tests/unit/ProcessorTests.cpp
b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
index 30b0344..0242704 100644
--- a/extensions/standard-processors/tests/unit/ProcessorTests.cpp
+++ b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
@@ -779,3 +779,50 @@ TEST_CASE("InputRequirementTestRequired",
"[InputRequirement]") {
REQUIRE_THROWS_WITH(plan->validateAnnotations(),
Catch::EndsWith("INPUT_REQUIRED was specified for the processor, but no
incoming connections were found"));
testController.runSession(plan);
}
+
+TEST_CASE("isSingleThreaded - one thread for a multithreaded processor",
"[isSingleThreaded]") {
+ TestController testController;
+
+ std::shared_ptr<TestPlan> plan = testController.createPlan();
+ auto processor = plan->addProcessor("GenerateFlowFile", "myProc");
+ // default max concurrent tasks value is 1 for every processor
+
+ REQUIRE_NOTHROW(plan->validateAnnotations());
+ REQUIRE(processor->getMaxConcurrentTasks() == 1);
+}
+
+TEST_CASE("isSingleThreaded - two threads for a multithreaded processor",
"[isSingleThreaded]") {
+ TestController testController;
+
+ std::shared_ptr<TestPlan> plan = testController.createPlan();
+ auto processor = plan->addProcessor("GenerateFlowFile", "myProc");
+ processor->setMaxConcurrentTasks(2);
+
+ REQUIRE_NOTHROW(plan->validateAnnotations());
+ REQUIRE(processor->getMaxConcurrentTasks() == 2);
+}
+
+TEST_CASE("isSingleThreaded - one thread for a single threaded processor",
"[isSingleThreaded]") {
+ TestController testController;
+
+ std::shared_ptr<TestPlan> plan = testController.createPlan();
+ auto processor = plan->addProcessor("TailFile", "myProc");
+ // default max concurrent tasks value is 1 for every processor
+
+ REQUIRE_NOTHROW(plan->validateAnnotations());
+ REQUIRE(processor->getMaxConcurrentTasks() == 1);
+}
+
+TEST_CASE("isSingleThreaded - two threads for a single threaded processor",
"[isSingleThreaded]") {
+ TestController testController;
+ LogTestController::getInstance().setDebug<minifi::core::Processor>();
+
+ std::shared_ptr<TestPlan> plan = testController.createPlan();
+ auto processor = plan->addProcessor("TailFile", "myProc");
+ processor->setMaxConcurrentTasks(2);
+
+ REQUIRE_NOTHROW(plan->validateAnnotations());
+ REQUIRE(processor->getMaxConcurrentTasks() == 1);
+ REQUIRE(LogTestController::getInstance().contains("[warning] Processor
myProc can not be run in parallel, its "
+ "\"max concurrent tasks\"
value is too high. It was set to 1 from 2."));
+}
diff --git a/extensions/systemd/ConsumeJournald.h
b/extensions/systemd/ConsumeJournald.h
index 25f86c9..ed48432 100644
--- a/extensions/systemd/ConsumeJournald.h
+++ b/extensions/systemd/ConsumeJournald.h
@@ -83,6 +83,10 @@ class ConsumeJournald final : public core::Processor {
std::chrono::system_clock::time_point timestamp;
};
+ core::annotation::Input getInputRequirement() const override {
+ return core::annotation::Input::INPUT_FORBIDDEN;
+ }
+
static std::optional<gsl::span<const char>>
enumerateJournalEntry(libwrapper::Journal&);
static std::optional<journal_field> getNextField(libwrapper::Journal&);
std::future<std::pair<std::string, std::vector<journal_message>>>
getCursorAndMessageBatch();
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
index b7e13a8..340b9e3 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
@@ -383,18 +383,18 @@ std::tuple<size_t, std::wstring>
ConsumeWindowsEventLog::processEventLogs(const
}
void ConsumeWindowsEventLog::onTrigger(const
std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::ProcessSession> &session) {
- if (!bookmark_) {
- logger_->log_debug("bookmark_ is null");
- context->yield();
- return;
- }
-
std::unique_lock<std::mutex> lock(on_trigger_mutex_, std::try_to_lock);
if (!lock.owns_lock()) {
logger_->log_warn("processor was triggered before previous listing
finished, configuration should be revised!");
return;
}
+ if (!bookmark_) {
+ logger_->log_debug("bookmark_ is null");
+ context->yield();
+ return;
+ }
+
logger_->log_trace("CWEL onTrigger");
size_t processed_event_count = 0;
@@ -439,7 +439,6 @@ void ConsumeWindowsEventLog::onTrigger(const
std::shared_ptr<core::ProcessContex
}
wel::WindowsEventLogHandler ConsumeWindowsEventLog::getEventLogHandler(const
std::string & name) {
- std::lock_guard<std::mutex> lock(cache_mutex_);
logger_->log_trace("Getting Event Log Handler corresponding to %s",
name.c_str());
auto provider = providers_.find(name);
if (provider != std::end(providers_)) {
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.h
b/extensions/windows-event-log/ConsumeWindowsEventLog.h
index 0283176..ea26a4d 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.h
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.h
@@ -137,6 +137,10 @@ class ConsumeWindowsEventLog : public core::Processor {
return core::annotation::Input::INPUT_FORBIDDEN;
}
+ bool isSingleThreaded() const override {
+ return true;
+ }
+
bool commitAndSaveBookmark(const std::wstring &bookmarkXml, const
std::shared_ptr<core::ProcessSession> &session);
std::tuple<size_t, std::wstring> processEventLogs(const
std::shared_ptr<core::ProcessContext> &context,
const std::shared_ptr<core::ProcessSession> &session, const EVT_HANDLE&
event_query_results);
@@ -156,7 +160,6 @@ class ConsumeWindowsEventLog : public core::Processor {
std::string computerName_;
uint64_t maxBufferSize_{};
DWORD lastActivityTimestamp_{};
- std::mutex cache_mutex_;
std::map<std::string, wel::WindowsEventLogHandler > providers_;
uint64_t batch_commit_size_{};
diff --git a/libminifi/include/agent/build_description.h
b/libminifi/include/agent/build_description.h
index d6810b9..ba4f967 100644
--- a/libminifi/include/agent/build_description.h
+++ b/libminifi/include/agent/build_description.h
@@ -60,6 +60,7 @@ class ClassDescription {
std::vector<core::Relationship> class_relationships_;
bool dynamic_properties_ = false;
std::string inputRequirement_;
+ bool isSingleThreaded_ = false;
bool dynamic_relationships_ = false;
bool is_controller_service_ = false;
};
@@ -151,6 +152,7 @@ class BuildDescription {
description.dynamic_relationships_ =
component->supportsDynamicRelationships();
if (is_processor) {
description.inputRequirement_ =
processor->getInputRequirementAsString();
+ description.isSingleThreaded_ = processor->isSingleThreaded();
description.class_relationships_ =
processor->getSupportedRelationships();
class_mappings[group].processors_.emplace_back(description);
} else if (is_controller_service) {
diff --git a/libminifi/include/core/Connectable.h
b/libminifi/include/core/Connectable.h
index 6b44c68..f77c24b 100644
--- a/libminifi/include/core/Connectable.h
+++ b/libminifi/include/core/Connectable.h
@@ -110,7 +110,7 @@ class Connectable : public CoreComponent {
return max_concurrent_tasks_;
}
- void setMaxConcurrentTasks(const uint8_t tasks) {
+ virtual void setMaxConcurrentTasks(uint8_t tasks) {
max_concurrent_tasks_ = tasks;
}
/**
diff --git a/libminifi/include/core/Processor.h
b/libminifi/include/core/Processor.h
index b058feb..5855378 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -144,13 +144,14 @@ class Processor : public Connectable, public
ConfigurableComponent, public std::
}
// Set Processor Maximum Concurrent Tasks
- void setMaxConcurrentTasks(uint8_t tasks) {
- max_concurrent_tasks_ = tasks;
- }
- // Get Processor Maximum Concurrent Tasks
- uint8_t getMaxConcurrentTasks() const {
- return (max_concurrent_tasks_);
+ void setMaxConcurrentTasks(uint8_t tasks) override;
+
+ // Overriding to yield true can be used to indicate that the Processor is
not safe for concurrent execution
+ // of its onTrigger() method. By default, Processors are assumed to be safe
for concurrent execution.
+ virtual bool isSingleThreaded() const {
+ return false;
}
+
// Set Trigger when empty
void setTriggerWhenEmpty(bool value) {
_triggerWhenEmpty = value;
diff --git a/libminifi/include/core/ProcessorNode.h
b/libminifi/include/core/ProcessorNode.h
index 557cc23..a0ee009 100644
--- a/libminifi/include/core/ProcessorNode.h
+++ b/libminifi/include/core/ProcessorNode.h
@@ -274,7 +274,7 @@ class ProcessorNode : public ConfigurableComponent, public
Connectable {
return processor_->getMaxConcurrentTasks();
}
- void setMaxConcurrentTasks(const uint8_t tasks) {
+ void setMaxConcurrentTasks(uint8_t tasks) override {
processor_->setMaxConcurrentTasks(tasks);
}
diff --git a/libminifi/include/core/state/nodes/AgentInformation.h
b/libminifi/include/core/state/nodes/AgentInformation.h
index f804672..0231d22 100644
--- a/libminifi/include/core/state/nodes/AgentInformation.h
+++ b/libminifi/include/core/state/nodes/AgentInformation.h
@@ -254,6 +254,10 @@ class ComponentManifest : public DeviceInformation {
inputReq.name = "inputRequirement";
inputReq.value = group.inputRequirement_;
+ SerializedResponseNode isSingleThreaded;
+ isSingleThreaded.name = "isSingleThreaded";
+ isSingleThreaded.value = group.isSingleThreaded_;
+
SerializedResponseNode relationships;
relationships.name = "supportedRelationships";
relationships.array = true;
@@ -275,6 +279,7 @@ class ComponentManifest : public DeviceInformation {
relationships.children.push_back(child);
}
desc.children.push_back(inputReq);
+ desc.children.push_back(isSingleThreaded);
desc.children.push_back(relationships);
}
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index ddff35a..744e1a3 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -385,15 +385,14 @@ void Processor::validateAnnotations() const {
if (!hasIncomingConnections()) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "INPUT_REQUIRED was
specified for the processor, but no incoming connections were found");
}
- return;
+ break;
}
case annotation::Input::INPUT_ALLOWED:
- return;
+ break;
case annotation::Input::INPUT_FORBIDDEN: {
if (hasIncomingConnections()) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "INPUT_FORBIDDEN was
specified for the processor, but there are incoming connections");
}
- return;
}
}
}
@@ -411,6 +410,17 @@ std::string Processor::getInputRequirementAsString() const
{
return "ERROR_no_such_input_requirement";
}
+void Processor::setMaxConcurrentTasks(const uint8_t tasks) {
+ if (isSingleThreaded() && tasks > 1) {
+ logger_->log_warn("Processor %s can not be run in parallel, its \"max
concurrent tasks\" value is too high. "
+ "It was set to 1 from %" PRIu8 ".", name_, tasks);
+ max_concurrent_tasks_ = 1;
+ return;
+ }
+
+ max_concurrent_tasks_ = tasks;
+}
+
} // namespace core
} // namespace minifi
} // namespace nifi