This is an automated email from the ASF dual-hosted git repository.

adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new f999498  MINIFICPP-1566 - Annotate maximum allowed threads for 
processors
f999498 is described below

commit f999498a11cb449b4185b7b4320896409cde4050
Author: Adam Markovics <[email protected]>
AuthorDate: Fri Oct 22 13:52:38 2021 +0200

    MINIFICPP-1566 - Annotate maximum allowed threads for processors
    
    Signed-off-by: Adam Debreceni <[email protected]>
    
    This closes #1191
---
 extensions/aws/processors/DeleteS3Object.h         |  4 ++
 extensions/aws/processors/FetchS3Object.h          |  4 ++
 extensions/aws/processors/PutS3Object.h            |  4 ++
 .../azure/processors/AzureStorageProcessorBase.h   |  1 -
 .../azure/processors/PutAzureBlobStorage.cpp       | 23 ++++-------
 extensions/azure/processors/PutAzureBlobStorage.h  |  8 ++++
 .../azure/processors/PutAzureDataLakeStorage.cpp   | 11 ++---
 .../azure/processors/PutAzureDataLakeStorage.h     |  4 ++
 extensions/opc/include/fetchopc.h                  |  2 -
 extensions/opc/include/opcbase.h                   |  4 ++
 extensions/opc/include/putopc.h                    |  3 --
 extensions/opc/src/fetchopc.cpp                    |  7 ----
 extensions/opc/src/putopc.cpp                      |  7 ----
 .../SourceInitiatedSubscriptionListener.h          |  4 ++
 extensions/pdh/PerformanceDataMonitor.h            |  8 ++++
 extensions/script/python/PythonCreator.h           |  1 +
 extensions/sftp/processors/ListSFTP.h              |  5 ++-
 extensions/sql/processors/SQLProcessor.cpp         |  2 -
 extensions/sql/processors/SQLProcessor.h           | 12 +++---
 .../standard-processors/processors/TailFile.cpp    |  5 ---
 .../standard-processors/processors/TailFile.h      |  5 ++-
 .../tests/unit/ManifestTests.cpp                   | 18 +++++----
 .../tests/unit/ProcessorTests.cpp                  | 47 ++++++++++++++++++++++
 extensions/systemd/ConsumeJournald.h               |  4 ++
 .../windows-event-log/ConsumeWindowsEventLog.cpp   | 13 +++---
 .../windows-event-log/ConsumeWindowsEventLog.h     |  5 ++-
 libminifi/include/agent/build_description.h        |  2 +
 libminifi/include/core/Connectable.h               |  2 +-
 libminifi/include/core/Processor.h                 | 13 +++---
 libminifi/include/core/ProcessorNode.h             |  2 +-
 .../include/core/state/nodes/AgentInformation.h    |  5 +++
 libminifi/src/core/Processor.cpp                   | 16 ++++++--
 32 files changed, 167 insertions(+), 84 deletions(-)

diff --git a/extensions/aws/processors/DeleteS3Object.h 
b/extensions/aws/processors/DeleteS3Object.h
index 5927d0c..e59b9f3 100644
--- a/extensions/aws/processors/DeleteS3Object.h
+++ b/extensions/aws/processors/DeleteS3Object.h
@@ -65,6 +65,10 @@ class DeleteS3Object : public S3Processor {
     return core::annotation::Input::INPUT_REQUIRED;
   }
 
+  bool isSingleThreaded() const override {
+    return true;
+  }
+
   friend class ::S3TestsFixture<DeleteS3Object>;
 
   explicit DeleteS3Object(const std::string& name, const 
minifi::utils::Identifier& uuid, std::unique_ptr<aws::s3::S3RequestSender> 
s3_request_sender)
diff --git a/extensions/aws/processors/FetchS3Object.h 
b/extensions/aws/processors/FetchS3Object.h
index 366a89a..14b1e98 100644
--- a/extensions/aws/processors/FetchS3Object.h
+++ b/extensions/aws/processors/FetchS3Object.h
@@ -94,6 +94,10 @@ class FetchS3Object : public S3Processor {
     return core::annotation::Input::INPUT_REQUIRED;
   }
 
+  bool isSingleThreaded() const override {
+    return true;
+  }
+
   friend class ::S3TestsFixture<FetchS3Object>;
 
   explicit FetchS3Object(const std::string& name, const 
minifi::utils::Identifier& uuid, std::unique_ptr<aws::s3::S3RequestSender> 
s3_request_sender)
diff --git a/extensions/aws/processors/PutS3Object.h 
b/extensions/aws/processors/PutS3Object.h
index 09f3f1e..b434bef 100644
--- a/extensions/aws/processors/PutS3Object.h
+++ b/extensions/aws/processors/PutS3Object.h
@@ -124,6 +124,10 @@ class PutS3Object : public S3Processor {
     return core::annotation::Input::INPUT_REQUIRED;
   }
 
+  bool isSingleThreaded() const override {
+    return true;
+  }
+
   friend class ::S3TestsFixture<PutS3Object>;
 
   explicit PutS3Object(const std::string& name, const 
minifi::utils::Identifier& uuid, std::unique_ptr<aws::s3::S3RequestSender> 
s3_request_sender)
diff --git a/extensions/azure/processors/AzureStorageProcessorBase.h 
b/extensions/azure/processors/AzureStorageProcessorBase.h
index 880f822..6017b85 100644
--- a/extensions/azure/processors/AzureStorageProcessorBase.h
+++ b/extensions/azure/processors/AzureStorageProcessorBase.h
@@ -51,7 +51,6 @@ class AzureStorageProcessorBase : public core::Processor {
 
   std::tuple<GetCredentialsFromControllerResult, 
std::optional<storage::AzureStorageCredentials>> 
getCredentialsFromControllerService(const std::shared_ptr<core::ProcessContext> 
&context) const;
 
-  std::mutex azure_storage_mutex_;
   std::shared_ptr<logging::Logger> logger_;
 };
 
diff --git a/extensions/azure/processors/PutAzureBlobStorage.cpp 
b/extensions/azure/processors/PutAzureBlobStorage.cpp
index 194b76c..e5f66fd 100644
--- a/extensions/azure/processors/PutAzureBlobStorage.cpp
+++ b/extensions/azure/processors/PutAzureBlobStorage.cpp
@@ -240,23 +240,16 @@ void PutAzureBlobStorage::onTrigger(const 
std::shared_ptr<core::ProcessContext>
     return;
   }
 
-  std::optional<storage::UploadBlobResult> upload_result;
-  {
-    // TODO(lordgamez): This can be removed after maximum allowed threads are 
implemented. See https://issues.apache.org/jira/browse/MINIFICPP-1566
-    // When used in multithreaded environment make sure to use the 
azure_storage_mutex_ to lock the wrapper so the
-    // client is not reset with different configuration while another thread 
is using it.
-    std::lock_guard<std::mutex> lock(azure_storage_mutex_);
-    if (create_container_) {
-      auto result = azure_blob_storage_.createContainerIfNotExists(*params);
-      if (!result) {
-        session->transfer(flow_file, Failure);
-        return;
-      }
+  if (create_container_) {
+    auto result = azure_blob_storage_.createContainerIfNotExists(*params);
+    if (!result) {
+      session->transfer(flow_file, Failure);
+      return;
     }
-    PutAzureBlobStorage::ReadCallback callback(flow_file->getSize(), 
azure_blob_storage_, *params);
-    session->read(flow_file, &callback);
-    upload_result = callback.getResult();
   }
+  PutAzureBlobStorage::ReadCallback callback(flow_file->getSize(), 
azure_blob_storage_, *params);
+  session->read(flow_file, &callback);
+  const std::optional<storage::UploadBlobResult> upload_result = 
callback.getResult();
 
   if (!upload_result) {
     logger_->log_error("Failed to upload blob '%s' to Azure Storage container 
'%s'", params->blob_name, params->container_name);
diff --git a/extensions/azure/processors/PutAzureBlobStorage.h 
b/extensions/azure/processors/PutAzureBlobStorage.h
index bfd71fd..6de322f 100644
--- a/extensions/azure/processors/PutAzureBlobStorage.h
+++ b/extensions/azure/processors/PutAzureBlobStorage.h
@@ -100,6 +100,14 @@ class PutAzureBlobStorage final : public 
AzureStorageProcessorBase {
     , azure_blob_storage_(std::move(blob_storage_client)) {
   }
 
+  core::annotation::Input getInputRequirement() const override {
+    return core::annotation::Input::INPUT_REQUIRED;
+  }
+
+  bool isSingleThreaded() const override {
+    return true;
+  }
+
   storage::AzureStorageCredentials getAzureCredentialsFromProperties(
     const std::shared_ptr<core::ProcessContext> &context,
     const std::shared_ptr<core::FlowFile> &flow_file) const;
diff --git a/extensions/azure/processors/PutAzureDataLakeStorage.cpp 
b/extensions/azure/processors/PutAzureDataLakeStorage.cpp
index c07c3b6..d48998e 100644
--- a/extensions/azure/processors/PutAzureDataLakeStorage.cpp
+++ b/extensions/azure/processors/PutAzureDataLakeStorage.cpp
@@ -124,14 +124,9 @@ void PutAzureDataLakeStorage::onTrigger(const 
std::shared_ptr<core::ProcessConte
     return;
   }
 
-  storage::UploadDataLakeStorageResult result;
-  {
-    // TODO(lordgamez): This can be removed after maximum allowed threads are 
implemented. See https://issues.apache.org/jira/browse/MINIFICPP-1566
-    std::lock_guard<std::mutex> lock(azure_storage_mutex_);
-    PutAzureDataLakeStorage::ReadCallback callback(flow_file->getSize(), 
azure_data_lake_storage_, *params, logger_);
-    session->read(flow_file, &callback);
-    result = callback.getResult();
-  }
+  PutAzureDataLakeStorage::ReadCallback callback(flow_file->getSize(), 
azure_data_lake_storage_, *params, logger_);
+  session->read(flow_file, &callback);
+  const storage::UploadDataLakeStorageResult result = callback.getResult();
 
   if (result.result_code == storage::UploadResultCode::FILE_ALREADY_EXISTS) {
     gsl_Expects(conflict_resolution_strategy_ != 
FileExistsResolutionStrategy::REPLACE_FILE);
diff --git a/extensions/azure/processors/PutAzureDataLakeStorage.h 
b/extensions/azure/processors/PutAzureDataLakeStorage.h
index d842cdc..c924cd6 100644
--- a/extensions/azure/processors/PutAzureDataLakeStorage.h
+++ b/extensions/azure/processors/PutAzureDataLakeStorage.h
@@ -88,6 +88,10 @@ class PutAzureDataLakeStorage final : public 
AzureStorageProcessorBase {
     return core::annotation::Input::INPUT_REQUIRED;
   }
 
+  bool isSingleThreaded() const override {
+    return true;
+  }
+
   explicit PutAzureDataLakeStorage(const std::string& name, const 
minifi::utils::Identifier& uuid, 
std::unique_ptr<storage::DataLakeStorageClient> data_lake_storage_client)
     : AzureStorageProcessorBase(name, uuid, 
logging::LoggerFactory<PutAzureDataLakeStorage>::getLogger()),
       azure_data_lake_storage_(std::move(data_lake_storage_client)) {
diff --git a/extensions/opc/include/fetchopc.h 
b/extensions/opc/include/fetchopc.h
index ed391a3..fb4471f 100644
--- a/extensions/opc/include/fetchopc.h
+++ b/extensions/opc/include/fetchopc.h
@@ -21,7 +21,6 @@
 #include <memory>
 #include <string>
 #include <unordered_map>
-#include <mutex>
 #include <vector>
 
 #include "opc.h"
@@ -92,7 +91,6 @@ class FetchOPCProcessor : public BaseOPCProcessor {
   bool lazy_mode_;
 
  private:
-  std::mutex onTriggerMutex_;
   std::vector<UA_NodeId> translatedNodeIDs_;  // Only used when user provides 
path, path->nodeid translation is only done once
   std::unordered_map<std::string, std::string> node_timestamp_;  // Key = Full 
path, Value = Timestamp
 };
diff --git a/extensions/opc/include/opcbase.h b/extensions/opc/include/opcbase.h
index ea4dc46..bebd765 100644
--- a/extensions/opc/include/opcbase.h
+++ b/extensions/opc/include/opcbase.h
@@ -54,6 +54,10 @@ class BaseOPCProcessor : public core::Processor {
   void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSessionFactory> &factory) override;
 
  protected:
+  bool isSingleThreaded() const override {
+    return true;
+  }
+
   virtual bool reconnect();
 
   std::shared_ptr<logging::Logger> logger_;
diff --git a/extensions/opc/include/putopc.h b/extensions/opc/include/putopc.h
index 0497693..8c2180d 100644
--- a/extensions/opc/include/putopc.h
+++ b/extensions/opc/include/putopc.h
@@ -22,7 +22,6 @@
 #include <memory>
 #include <string>
 #include <vector>
-#include <mutex>
 
 #include "opc.h"
 #include "opcbase.h"
@@ -81,8 +80,6 @@ class PutOPCProcessor : public BaseOPCProcessor {
     std::shared_ptr<logging::Logger> logger_;
   };
 
-  std::mutex onTriggerMutex_;
-
   std::string nodeID_;
   int32_t nameSpaceIdx_;
   opc::OPCNodeIDType idType_;
diff --git a/extensions/opc/src/fetchopc.cpp b/extensions/opc/src/fetchopc.cpp
index cfb8c5d..cedade7 100644
--- a/extensions/opc/src/fetchopc.cpp
+++ b/extensions/opc/src/fetchopc.cpp
@@ -21,7 +21,6 @@
 #include <string>
 #include <list>
 #include <map>
-#include <mutex>
 #include <thread>
 
 #include "opc.h"
@@ -136,12 +135,6 @@ namespace processors {
   void FetchOPCProcessor::onTrigger(const 
std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSession> &session) {
     logger_->log_trace("FetchOPCProcessor::onTrigger");
 
-    std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock);
-    if (!lock.owns_lock()) {
-      logger_->log_warn("processor was triggered before previous listing 
finished, configuration should be revised!");
-      return;
-    }
-
     if (!reconnect()) {
       yield();
       return;
diff --git a/extensions/opc/src/putopc.cpp b/extensions/opc/src/putopc.cpp
index 5936ce3..8e4368a 100644
--- a/extensions/opc/src/putopc.cpp
+++ b/extensions/opc/src/putopc.cpp
@@ -20,7 +20,6 @@
 #include <list>
 #include <map>
 #include <memory>
-#include <mutex>
 #include <string>
 #include <optional>
 #include <thread>
@@ -157,12 +156,6 @@ namespace processors {
   void PutOPCProcessor::onTrigger(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSession> &session) {
     logger_->log_trace("PutOPCProcessor::onTrigger");
 
-    std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock);
-    if (!lock.owns_lock()) {
-      logger_->log_warn("processor was triggered before previous listing 
finished, configuration should be revised!");
-      return;
-    }
-
     if (!reconnect()) {
       yield();
       return;
diff --git 
a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h 
b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
index e9f58d1..b00a2a4 100644
--- a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
+++ b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
@@ -113,6 +113,10 @@ class SourceInitiatedSubscriptionListener : public 
core::Processor {
   };
 
  protected:
+  bool isSingleThreaded() const override {
+    return true;
+  }
+
   std::shared_ptr<logging::Logger> logger_;
 
   std::shared_ptr<core::CoreComponentStateManager> state_manager_;
diff --git a/extensions/pdh/PerformanceDataMonitor.h 
b/extensions/pdh/PerformanceDataMonitor.h
index 3539741..b93c5cf 100644
--- a/extensions/pdh/PerformanceDataMonitor.h
+++ b/extensions/pdh/PerformanceDataMonitor.h
@@ -73,6 +73,14 @@ class PerformanceDataMonitor : public core::Processor {
     JSON, OPENTELEMETRY
   };
 
+  core::annotation::Input getInputRequirement() const override {
+    return core::annotation::Input::INPUT_FORBIDDEN;
+  }
+
+  bool isSingleThreaded() const override {
+    return true;
+  }
+
   rapidjson::Value& prepareJSONBody(rapidjson::Document& root);
 
   void setupMembersFromProperties(const std::shared_ptr<core::ProcessContext>& 
context);
diff --git a/extensions/script/python/PythonCreator.h 
b/extensions/script/python/PythonCreator.h
index 4977fab..24d0abb 100644
--- a/extensions/script/python/PythonCreator.h
+++ b/extensions/script/python/PythonCreator.h
@@ -100,6 +100,7 @@ class PythonCreator : public minifi::core::CoreComponent {
     minifi::ClassDescription description(fullname);
     description.dynamic_properties_ = 
processor->getPythonSupportDynamicProperties();
     description.inputRequirement_ = processor->getInputRequirementAsString();
+    description.isSingleThreaded_ = processor->isSingleThreaded();
     auto properties = processor->getPythonProperties();
 
     minifi::AgentDocs::putDescription(scriptname, processor->getDescription());
diff --git a/extensions/sftp/processors/ListSFTP.h 
b/extensions/sftp/processors/ListSFTP.h
index d981d34..9967b38 100644
--- a/extensions/sftp/processors/ListSFTP.h
+++ b/extensions/sftp/processors/ListSFTP.h
@@ -60,7 +60,7 @@ class ListSFTP : public SFTPProcessorBase {
    * Create a new processor
    */
   explicit ListSFTP(const std::string& name, const utils::Identifier& uuid = 
{});
-  virtual ~ListSFTP();
+  ~ListSFTP() override;
 
   // Supported Properties
   static core::Property ListingStrategy;
@@ -102,6 +102,9 @@ class ListSFTP : public SFTPProcessorBase {
     return core::annotation::Input::INPUT_FORBIDDEN;
   }
 
+  bool isSingleThreaded() const override {
+    return true;
+  }
 
   std::shared_ptr<core::CoreComponentStateManager> state_manager_;
   std::string listing_strategy_;
diff --git a/extensions/sql/processors/SQLProcessor.cpp 
b/extensions/sql/processors/SQLProcessor.cpp
index 5238a1c..b9197f7 100644
--- a/extensions/sql/processors/SQLProcessor.cpp
+++ b/extensions/sql/processors/SQLProcessor.cpp
@@ -52,8 +52,6 @@ void SQLProcessor::onSchedule(const 
std::shared_ptr<core::ProcessContext>& conte
 }
 
 void SQLProcessor::onTrigger(const std::shared_ptr<core::ProcessContext>& 
context, const std::shared_ptr<core::ProcessSession>& session) {
-  std::lock_guard<std::mutex> guard(on_trigger_mutex_);
-
   try {
     if (!connection_) {
       connection_ = db_service_->getConnection();
diff --git a/extensions/sql/processors/SQLProcessor.h 
b/extensions/sql/processors/SQLProcessor.h
index 6def80d..8da0de5 100644
--- a/extensions/sql/processors/SQLProcessor.h
+++ b/extensions/sql/processors/SQLProcessor.h
@@ -42,6 +42,10 @@ class SQLProcessor: public core::Processor {
     : core::Processor(name, uuid), logger_(std::move(logger)) {
   }
 
+  bool isSingleThreaded() const override {
+    return true;
+  }
+
   static std::vector<std::string> collectArguments(const 
std::shared_ptr<core::FlowFile>& flow_file);
 
   virtual void processOnSchedule(core::ProcessContext& context) = 0;
@@ -55,11 +59,9 @@ class SQLProcessor: public core::Processor {
     connection_.reset();
   }
 
- protected:
-   std::shared_ptr<logging::Logger> logger_;
-   std::shared_ptr<sql::controllers::DatabaseService> db_service_;
-   std::unique_ptr<sql::Connection> connection_;
-   std::mutex on_trigger_mutex_;
+  std::shared_ptr<logging::Logger> logger_;
+  std::shared_ptr<sql::controllers::DatabaseService> db_service_;
+  std::unique_ptr<sql::Connection> connection_;
 };
 
 }  // namespace processors
diff --git a/extensions/standard-processors/processors/TailFile.cpp 
b/extensions/standard-processors/processors/TailFile.cpp
index 1dec544..6180a2b 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -33,7 +33,6 @@
 #include <regex>
 
 #include "range/v3/action/sort.hpp"
-#include "range/v3/algorithm/transform.hpp"
 
 #include "io/CRCStream.h"
 #include "utils/file/FileUtils.h"
@@ -335,8 +334,6 @@ void TailFile::initialize() {
 }
 
 void TailFile::onSchedule(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSessionFactory>& 
/*sessionFactory*/) {
-  std::lock_guard<std::mutex> tail_lock(tail_file_mutex_);
-
   tail_states_.clear();
 
   state_manager_ = context->getStateManager();
@@ -671,8 +668,6 @@ std::vector<TailState> 
TailFile::sortAndSkipMainFilePrefix(const TailState &stat
 }
 
 void TailFile::onTrigger(const std::shared_ptr<core::ProcessContext> &, const 
std::shared_ptr<core::ProcessSession> &session) {
-  std::lock_guard<std::mutex> tail_lock(tail_file_mutex_);
-
   if (tail_mode_ == Mode::MULTIPLE) {
     if (last_multifile_lookup_ + lookup_frequency_ < 
std::chrono::steady_clock::now()) {
       logger_->log_debug("Lookup frequency %" PRId64 " ms have elapsed, doing 
new multifile lookup", int64_t{lookup_frequency_.count()});
diff --git a/extensions/standard-processors/processors/TailFile.h 
b/extensions/standard-processors/processors/TailFile.h
index 4d85505..dc2048c 100644
--- a/extensions/standard-processors/processors/TailFile.h
+++ b/extensions/standard-processors/processors/TailFile.h
@@ -140,6 +140,10 @@ class TailFile : public core::Processor {
     return core::annotation::Input::INPUT_FORBIDDEN;
   }
 
+  bool isSingleThreaded() const override {
+    return true;
+  }
+
   void parseStateFileLine(char *buf, std::map<std::string, TailState> &state) 
const;
   void processAllRotatedFiles(const std::shared_ptr<core::ProcessSession> 
&session, TailState &state);
   void processRotatedFiles(const std::shared_ptr<core::ProcessSession> 
&session, TailState &state, std::vector<TailState> &rotated_file_states);
@@ -170,7 +174,6 @@ class TailFile : public core::Processor {
   static const char *POSITION_STR;
   static const int BUFFER_SIZE = 512;
 
-  std::mutex tail_file_mutex_;
   std::string delimiter_;  // Delimiter for the data incoming from the tailed 
file.
   std::shared_ptr<core::CoreComponentStateManager> state_manager_;
   std::map<std::string, TailState> tail_states_;
diff --git a/extensions/standard-processors/tests/unit/ManifestTests.cpp 
b/extensions/standard-processors/tests/unit/ManifestTests.cpp
index efe4b24..d6dfc47 100644
--- a/extensions/standard-processors/tests/unit/ManifestTests.cpp
+++ b/extensions/standard-processors/tests/unit/ManifestTests.cpp
@@ -17,12 +17,9 @@
  */
 
 #include <memory>
-#include <algorithm>
-#include "core/Processor.h"
 #include "core/state/nodes/DeviceInformation.h"
 #include "core/state/nodes/AgentInformation.h"
 #include "TestBase.h"
-#include "io/ClientSocket.h"
 #include "core/ClassLoader.h"
 
 // Include some processor headers to make sure they are part of the manifest
@@ -107,8 +104,17 @@ TEST_CASE("Test Relationships", "[rel1]") {
     }
   }
 #ifndef WIN32
+  const auto& inputRequirement = proc_0.children[1];
+  REQUIRE(inputRequirement.name == "inputRequirement");
+  REQUIRE(inputRequirement.value.to_string() == "INPUT_REQUIRED");
+
+  const auto& isSingleThreaded = proc_0.children[2];
+  REQUIRE(isSingleThreaded.name == "isSingleThreaded");
+  REQUIRE(isSingleThreaded.value.getValue()->getTypeIndex() == 
org::apache::nifi::minifi::state::response::Value::BOOL_TYPE);
+  REQUIRE(isSingleThreaded.value.to_string() == "false");
+
   REQUIRE(proc_0.children.size() > 0);
-  const auto& relationships = proc_0.children[2];
+  const auto& relationships = proc_0.children[3];
   REQUIRE("supportedRelationships" == relationships.name);
   // this is because they are now nested
   REQUIRE("supportedRelationships" == relationships.children[0].name);
@@ -118,10 +124,6 @@ TEST_CASE("Test Relationships", "[rel1]") {
 
   REQUIRE("success" == 
relationships.children[1].children[0].value.to_string());
   REQUIRE("description" == relationships.children[1].children[1].name);
-
-  const auto& inputRequirement = proc_0.children[1];
-  REQUIRE("inputRequirement" == inputRequirement.name);
-  REQUIRE("INPUT_REQUIRED" == inputRequirement.value.to_string());
 #endif
 }
 
diff --git a/extensions/standard-processors/tests/unit/ProcessorTests.cpp 
b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
index 30b0344..0242704 100644
--- a/extensions/standard-processors/tests/unit/ProcessorTests.cpp
+++ b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
@@ -779,3 +779,50 @@ TEST_CASE("InputRequirementTestRequired", 
"[InputRequirement]") {
   REQUIRE_THROWS_WITH(plan->validateAnnotations(), 
Catch::EndsWith("INPUT_REQUIRED was specified for the processor, but no 
incoming connections were found"));
   testController.runSession(plan);
 }
+
+TEST_CASE("isSingleThreaded - one thread for a multithreaded processor", 
"[isSingleThreaded]") {
+  TestController testController;
+
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+  auto processor = plan->addProcessor("GenerateFlowFile", "myProc");
+  // default max concurrent tasks value is 1 for every processor
+
+  REQUIRE_NOTHROW(plan->validateAnnotations());
+  REQUIRE(processor->getMaxConcurrentTasks() == 1);
+}
+
+TEST_CASE("isSingleThreaded - two threads for a multithreaded processor", 
"[isSingleThreaded]") {
+  TestController testController;
+
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+  auto processor = plan->addProcessor("GenerateFlowFile", "myProc");
+  processor->setMaxConcurrentTasks(2);
+
+  REQUIRE_NOTHROW(plan->validateAnnotations());
+  REQUIRE(processor->getMaxConcurrentTasks() == 2);
+}
+
+TEST_CASE("isSingleThreaded - one thread for a single threaded processor", 
"[isSingleThreaded]") {
+  TestController testController;
+
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+  auto processor = plan->addProcessor("TailFile", "myProc");
+  // default max concurrent tasks value is 1 for every processor
+
+  REQUIRE_NOTHROW(plan->validateAnnotations());
+  REQUIRE(processor->getMaxConcurrentTasks() == 1);
+}
+
+TEST_CASE("isSingleThreaded - two threads for a single threaded processor", 
"[isSingleThreaded]") {
+  TestController testController;
+  LogTestController::getInstance().setDebug<minifi::core::Processor>();
+
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+  auto processor = plan->addProcessor("TailFile", "myProc");
+  processor->setMaxConcurrentTasks(2);
+
+  REQUIRE_NOTHROW(plan->validateAnnotations());
+  REQUIRE(processor->getMaxConcurrentTasks() == 1);
+  REQUIRE(LogTestController::getInstance().contains("[warning] Processor 
myProc can not be run in parallel, its "
+                                                    "\"max concurrent tasks\" 
value is too high. It was set to 1 from 2."));
+}
diff --git a/extensions/systemd/ConsumeJournald.h 
b/extensions/systemd/ConsumeJournald.h
index 25f86c9..ed48432 100644
--- a/extensions/systemd/ConsumeJournald.h
+++ b/extensions/systemd/ConsumeJournald.h
@@ -83,6 +83,10 @@ class ConsumeJournald final : public core::Processor {
     std::chrono::system_clock::time_point timestamp;
   };
 
+  core::annotation::Input getInputRequirement() const override {
+    return core::annotation::Input::INPUT_FORBIDDEN;
+  }
+
   static std::optional<gsl::span<const char>> 
enumerateJournalEntry(libwrapper::Journal&);
   static std::optional<journal_field> getNextField(libwrapper::Journal&);
   std::future<std::pair<std::string, std::vector<journal_message>>> 
getCursorAndMessageBatch();
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp 
b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
index b7e13a8..340b9e3 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
@@ -383,18 +383,18 @@ std::tuple<size_t, std::wstring> 
ConsumeWindowsEventLog::processEventLogs(const
 }
 
 void ConsumeWindowsEventLog::onTrigger(const 
std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSession> &session) {
-  if (!bookmark_) {
-    logger_->log_debug("bookmark_ is null");
-    context->yield();
-    return;
-  }
-
   std::unique_lock<std::mutex> lock(on_trigger_mutex_, std::try_to_lock);
   if (!lock.owns_lock()) {
     logger_->log_warn("processor was triggered before previous listing 
finished, configuration should be revised!");
     return;
   }
 
+  if (!bookmark_) {
+    logger_->log_debug("bookmark_ is null");
+    context->yield();
+    return;
+  }
+
   logger_->log_trace("CWEL onTrigger");
 
   size_t processed_event_count = 0;
@@ -439,7 +439,6 @@ void ConsumeWindowsEventLog::onTrigger(const 
std::shared_ptr<core::ProcessContex
 }
 
 wel::WindowsEventLogHandler ConsumeWindowsEventLog::getEventLogHandler(const 
std::string & name) {
-  std::lock_guard<std::mutex> lock(cache_mutex_);
   logger_->log_trace("Getting Event Log Handler corresponding to %s", 
name.c_str());
   auto provider = providers_.find(name);
   if (provider != std::end(providers_)) {
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.h 
b/extensions/windows-event-log/ConsumeWindowsEventLog.h
index 0283176..ea26a4d 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.h
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.h
@@ -137,6 +137,10 @@ class ConsumeWindowsEventLog : public core::Processor {
     return core::annotation::Input::INPUT_FORBIDDEN;
   }
 
+  bool isSingleThreaded() const override {
+    return true;
+  }
+
   bool commitAndSaveBookmark(const std::wstring &bookmarkXml, const 
std::shared_ptr<core::ProcessSession> &session);
   std::tuple<size_t, std::wstring> processEventLogs(const 
std::shared_ptr<core::ProcessContext> &context,
     const std::shared_ptr<core::ProcessSession> &session, const EVT_HANDLE& 
event_query_results);
@@ -156,7 +160,6 @@ class ConsumeWindowsEventLog : public core::Processor {
   std::string computerName_;
   uint64_t maxBufferSize_{};
   DWORD lastActivityTimestamp_{};
-  std::mutex cache_mutex_;
   std::map<std::string, wel::WindowsEventLogHandler > providers_;
   uint64_t batch_commit_size_{};
 
diff --git a/libminifi/include/agent/build_description.h 
b/libminifi/include/agent/build_description.h
index d6810b9..ba4f967 100644
--- a/libminifi/include/agent/build_description.h
+++ b/libminifi/include/agent/build_description.h
@@ -60,6 +60,7 @@ class ClassDescription {
   std::vector<core::Relationship> class_relationships_;
   bool dynamic_properties_ = false;
   std::string inputRequirement_;
+  bool isSingleThreaded_ = false;
   bool dynamic_relationships_ = false;
   bool is_controller_service_ = false;
 };
@@ -151,6 +152,7 @@ class BuildDescription {
           description.dynamic_relationships_ = 
component->supportsDynamicRelationships();
           if (is_processor) {
             description.inputRequirement_ = 
processor->getInputRequirementAsString();
+            description.isSingleThreaded_ = processor->isSingleThreaded();
             description.class_relationships_ = 
processor->getSupportedRelationships();
             class_mappings[group].processors_.emplace_back(description);
           } else if (is_controller_service) {
diff --git a/libminifi/include/core/Connectable.h 
b/libminifi/include/core/Connectable.h
index 6b44c68..f77c24b 100644
--- a/libminifi/include/core/Connectable.h
+++ b/libminifi/include/core/Connectable.h
@@ -110,7 +110,7 @@ class Connectable : public CoreComponent {
     return max_concurrent_tasks_;
   }
 
-  void setMaxConcurrentTasks(const uint8_t tasks) {
+  virtual void setMaxConcurrentTasks(uint8_t tasks) {
     max_concurrent_tasks_ = tasks;
   }
   /**
diff --git a/libminifi/include/core/Processor.h 
b/libminifi/include/core/Processor.h
index b058feb..5855378 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -144,13 +144,14 @@ class Processor : public Connectable, public 
ConfigurableComponent, public std::
   }
 
   // Set Processor Maximum Concurrent Tasks
-  void setMaxConcurrentTasks(uint8_t tasks) {
-    max_concurrent_tasks_ = tasks;
-  }
-  // Get Processor Maximum Concurrent Tasks
-  uint8_t getMaxConcurrentTasks() const {
-    return (max_concurrent_tasks_);
+  void setMaxConcurrentTasks(uint8_t tasks) override;
+
+  // Overriding to yield true can be used to indicate that the Processor is 
not safe for concurrent execution
+  // of its onTrigger() method. By default, Processors are assumed to be safe 
for concurrent execution.
+  virtual bool isSingleThreaded() const {
+    return false;
   }
+
   // Set Trigger when empty
   void setTriggerWhenEmpty(bool value) {
     _triggerWhenEmpty = value;
diff --git a/libminifi/include/core/ProcessorNode.h 
b/libminifi/include/core/ProcessorNode.h
index 557cc23..a0ee009 100644
--- a/libminifi/include/core/ProcessorNode.h
+++ b/libminifi/include/core/ProcessorNode.h
@@ -274,7 +274,7 @@ class ProcessorNode : public ConfigurableComponent, public 
Connectable {
     return processor_->getMaxConcurrentTasks();
   }
 
-  void setMaxConcurrentTasks(const uint8_t tasks) {
+  void setMaxConcurrentTasks(uint8_t tasks) override {
     processor_->setMaxConcurrentTasks(tasks);
   }
 
diff --git a/libminifi/include/core/state/nodes/AgentInformation.h 
b/libminifi/include/core/state/nodes/AgentInformation.h
index f804672..0231d22 100644
--- a/libminifi/include/core/state/nodes/AgentInformation.h
+++ b/libminifi/include/core/state/nodes/AgentInformation.h
@@ -254,6 +254,10 @@ class ComponentManifest : public DeviceInformation {
           inputReq.name = "inputRequirement";
           inputReq.value = group.inputRequirement_;
 
+          SerializedResponseNode isSingleThreaded;
+          isSingleThreaded.name = "isSingleThreaded";
+          isSingleThreaded.value = group.isSingleThreaded_;
+
           SerializedResponseNode relationships;
           relationships.name = "supportedRelationships";
           relationships.array = true;
@@ -275,6 +279,7 @@ class ComponentManifest : public DeviceInformation {
             relationships.children.push_back(child);
           }
           desc.children.push_back(inputReq);
+          desc.children.push_back(isSingleThreaded);
           desc.children.push_back(relationships);
         }
 
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index ddff35a..744e1a3 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -385,15 +385,14 @@ void Processor::validateAnnotations() const {
       if (!hasIncomingConnections()) {
         throw Exception(PROCESS_SCHEDULE_EXCEPTION, "INPUT_REQUIRED was 
specified for the processor, but no incoming connections were found");
       }
-      return;
+      break;
     }
     case annotation::Input::INPUT_ALLOWED:
-      return;
+      break;
     case annotation::Input::INPUT_FORBIDDEN: {
       if (hasIncomingConnections()) {
         throw Exception(PROCESS_SCHEDULE_EXCEPTION, "INPUT_FORBIDDEN was 
specified for the processor, but there are incoming connections");
       }
-      return;
     }
   }
 }
@@ -411,6 +410,17 @@ std::string Processor::getInputRequirementAsString() const 
{
   return "ERROR_no_such_input_requirement";
 }
 
+void Processor::setMaxConcurrentTasks(const uint8_t tasks) {
+  if (isSingleThreaded() && tasks > 1) {
+    logger_->log_warn("Processor %s can not be run in parallel, its \"max 
concurrent tasks\" value is too high. "
+                      "It was set to 1 from %" PRIu8 ".", name_, tasks);
+    max_concurrent_tasks_ = 1;
+    return;
+  }
+
+  max_concurrent_tasks_ = tasks;
+}
+
 }  // namespace core
 }  // namespace minifi
 }  // namespace nifi

Reply via email to