This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new f4ec6d8ca MINIFICPP-2502 Add processorBulletins C2 metric node to 
FlowInformation
f4ec6d8ca is described below

commit f4ec6d8caca2ec8c7961e82c39e30c409df5ec38
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Mon May 19 12:32:07 2025 +0200

    MINIFICPP-2502 Add processorBulletins C2 metric node to FlowInformation
    
    Closes #1920
    
    Signed-off-by: Marton Szasz <[email protected]>
---
 C2.md                                              |  31 +++++
 conf/minifi.properties                             |   3 +
 extensions/python/ExecutePythonProcessor.cpp       |   9 +-
 extensions/python/ExecutePythonProcessor.h         |   2 +
 .../tests/unit/FlowJsonTests.cpp                   |   2 +
 .../tests/unit/YamlConfigurationTests.cpp          |   2 +
 libminifi/include/core/BulletinStore.h             |  70 +++++++++++
 libminifi/include/core/FlowConfiguration.h         |   3 +
 libminifi/include/core/ProcessGroup.h              |  37 +-----
 libminifi/include/core/ProcessorConfig.h           |  62 ----------
 libminifi/include/core/flow/FlowSchema.h           |   1 +
 .../include/core/flow/StructuredConfiguration.h    |   2 +-
 libminifi/include/core/logging/LoggerBase.h        |  32 +++++
 .../include/core/state/MetricsPublisherStore.h     |   3 +-
 .../include/core/state/nodes/FlowInformation.h     |   6 +
 .../include/core/state/nodes/ResponseNodeLoader.h  |   4 +-
 .../include/core/state/nodes/SchedulingNodes.h     |   2 +-
 libminifi/include/core/yaml/YamlConfiguration.h    |   2 +-
 libminifi/src/Configuration.cpp                    |   1 +
 libminifi/src/core/BulletinStore.cpp               |  80 +++++++++++++
 libminifi/src/core/FlowConfiguration.cpp           |   1 +
 libminifi/src/core/ProcessGroup.cpp                |  13 +++
 libminifi/src/core/flow/FlowSchema.cpp             |   2 +
 .../src/core/flow/StructuredConfiguration.cpp      |  17 +++
 libminifi/src/core/logging/LoggerBase.cpp          |   7 ++
 libminifi/src/core/state/MetricsPublisherStore.cpp |   4 +-
 libminifi/src/core/state/nodes/FlowInformation.cpp |  27 +++++
 .../src/core/state/nodes/ResponseNodeLoader.cpp    |   9 +-
 libminifi/test/integration/C2MetricsTest.cpp       |  27 ++++-
 .../test/libtest/integration/IntegrationBase.cpp   |   6 +-
 .../test/libtest/integration/IntegrationBase.h     |   2 +
 libminifi/test/resources/TestC2Metrics.yml         |   2 +
 libminifi/test/unit/BulletinStoreTests.cpp         | 129 +++++++++++++++++++++
 minifi-api/include/minifi-cpp/core/Processor.h     |   8 ++
 .../include/minifi-cpp/core/ProcessorConfig.h      |   1 +
 .../include/minifi-cpp/core/logging/Logger.h       |   2 +
 .../include/minifi-cpp/properties/Configuration.h  |   1 +
 minifi_main/MiNiFiMain.cpp                         |   8 +-
 utils/include/core/Processor.h                     |  29 +++++
 utils/include/utils/TimeUtil.h                     |   4 +
 utils/src/core/Processor.cpp                       |   5 +-
 41 files changed, 541 insertions(+), 117 deletions(-)

diff --git a/C2.md b/C2.md
index c990829d4..244e0e887 100644
--- a/C2.md
+++ b/C2.md
@@ -115,6 +115,9 @@ be requested via C2 DESCRIBE manifest command.
     # minimize REST heartbeat updates
     #nifi.c2.rest.heartbeat.minimize.updates=true
 
+    # specify the maximum number of bulletins to send in a heartbeat
+    # nifi.c2.flow.info.processor.bulletin.limit=1000
+
 #### Flow Id and URL
 
 Flow id and URL are usually retrieved from the C2 server. These identify the 
last updated flow version and where the flow was downloaded from. These 
properties are persisted in the minifi.properties file.
@@ -194,6 +197,20 @@ configuration produces the following JSON:
                       "uuid": "2438e3c8-015a-1000-79ca-83af40ec1997"
                   }
               },
+              "processorBulletins": [
+                {
+                    "id": 1,
+                    "timestamp": "Mon Jan 27 12:10:47 UTC 2025",
+                    "level": "ERROR",
+                    "category": "Log Message",
+                    "message": "Error connecting to localhost:8776 due to 
Connection refused (2438e3c8-015a-1000-79ca-83af40ec1991)",
+                    "groupId": "2438e3c8-015a-1000-79ca-83af40ec1990",
+                    "groupName": "MiNiFi Flow",
+                    "groupPath": "MiNiFi Flow",
+                    "sourceId": "2438e3c8-015a-1000-79ca-83af40ec1991",
+                    "sourceName": "GetTCP"
+                }
+              ],
               "processorStatuses": [
                 {
                     "id": "5128e3c8-015a-1000-79ca-83af40ec1990",
@@ -536,6 +553,20 @@ Contains information about the flow the agent is running, 
including the versione
             "uuid": "8368e3c8-015a-1003-52ca-83af40ec1332"
         }
     },
+    "processorBulletins": [
+        {
+            "id": 1,
+            "timestamp": "Mon Jan 27 12:10:47 UTC 2025",
+            "level": "ERROR",
+            "category": "Log Message",
+            "message": "Error connecting to localhost:8776 due to Connection 
refused (2438e3c8-015a-1000-79ca-83af40ec1991)",
+            "groupId": "2438e3c8-015a-1000-79ca-83af40ec1990",
+            "groupName": "MiNiFi Flow",
+            "groupPath": "MiNiFi Flow",
+            "sourceId": "2438e3c8-015a-1000-79ca-83af40ec1991",
+            "sourceName": "GetTCP"
+        }
+    ],
     "processorStatuses": [
         {
             "id": "5128e3c8-015a-1000-79ca-83af40ec1990",
diff --git a/conf/minifi.properties b/conf/minifi.properties
index b327daaa3..ef5704fcd 100644
--- a/conf/minifi.properties
+++ b/conf/minifi.properties
@@ -120,6 +120,9 @@ nifi.c2.full.heartbeat=false
 # specify encoding strategy for c2 requests (gzip, none)
 #nifi.c2.rest.request.encoding=none
 
+# specify the maximum number of bulletins to send in a heartbeat
+#nifi.c2.flow.info.processor.bulletin.limit=1000
+
 ## enable the controller socket provider on port 9998
 ## off by default.
 #controller.socket.enable=true
diff --git a/extensions/python/ExecutePythonProcessor.cpp 
b/extensions/python/ExecutePythonProcessor.cpp
index 707e23806..836c6583b 100644
--- a/extensions/python/ExecutePythonProcessor.cpp
+++ b/extensions/python/ExecutePythonProcessor.cpp
@@ -158,10 +158,7 @@ void 
ExecutePythonProcessor::reloadScriptIfUsingScriptFileProperty() {
 
 std::unique_ptr<PythonScriptEngine> 
ExecutePythonProcessor::createScriptEngine() {
   auto engine = std::make_unique<PythonScriptEngine>();
-
-  python_logger_ = 
core::logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName());
   engine->initialize(Success, Failure, Original, python_logger_);
-
   return engine;
 }
 
@@ -283,6 +280,12 @@ std::vector<core::Relationship> 
ExecutePythonProcessor::getPythonRelationships()
   return relationships;
 }
 
+void ExecutePythonProcessor::setLoggerCallback(const 
std::function<void(core::logging::LOG_LEVEL level, const std::string& 
message)>& callback) {
+  gsl_Expects(logger_ && python_logger_);
+  logger_->setLogCallback(callback);
+  python_logger_->setLogCallback(callback);
+}
+
 REGISTER_RESOURCE(ExecutePythonProcessor, Processor);
 
 }  // namespace org::apache::nifi::minifi::extensions::python::processors
diff --git a/extensions/python/ExecutePythonProcessor.h 
b/extensions/python/ExecutePythonProcessor.h
index 9c0138565..b9e12da07 100644
--- a/extensions/python/ExecutePythonProcessor.h
+++ b/extensions/python/ExecutePythonProcessor.h
@@ -46,6 +46,7 @@ class ExecutePythonProcessor : public core::ProcessorImpl {
         python_dynamic_(false),
         reload_on_script_change_(true) {
     logger_ = 
core::logging::LoggerFactory<ExecutePythonProcessor>::getLogger(uuid_);
+    python_logger_ = 
core::logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName());
   }
 
   EXTENSIONAPI static constexpr const char* Description = "Executes a script 
given the flow file and a process session. "
@@ -141,6 +142,7 @@ class ExecutePythonProcessor : public core::ProcessorImpl {
   nonstd::expected<core::Property, std::error_code> 
getSupportedProperty(std::string_view name) const override;
 
   std::vector<core::Relationship> getPythonRelationships() const;
+  void setLoggerCallback(const std::function<void(core::logging::LOG_LEVEL 
level, const std::string& message)>& callback) override;
 
   nonstd::expected<std::string, std::error_code> getProperty(std::string_view 
name) const override;
   nonstd::expected<void, std::error_code> setProperty(std::string_view name, 
std::string value) override;
diff --git a/extensions/standard-processors/tests/unit/FlowJsonTests.cpp 
b/extensions/standard-processors/tests/unit/FlowJsonTests.cpp
index 5ef3abf55..88a3fd1f8 100644
--- a/extensions/standard-processors/tests/unit/FlowJsonTests.cpp
+++ b/extensions/standard-processors/tests/unit/FlowJsonTests.cpp
@@ -79,6 +79,7 @@ TEST_CASE("NiFi flow json format is correctly parsed") {
       "schedulingPeriod": "3 sec",
       "penaltyDuration": "12 sec",
       "yieldDuration": "4 sec",
+      "bulletinLevel": "ERROR",
       "runDurationMillis": 12,
       "autoTerminatedRelationships": ["one", "two"],
       "properties": {
@@ -148,6 +149,7 @@ TEST_CASE("NiFi flow json format is correctly parsed") {
   CHECK(3s == proc->getSchedulingPeriod());
   CHECK(12s == proc->getPenalizationPeriod());
   CHECK(4s == proc->getYieldPeriod());
+  CHECK(proc->getLogBulletinLevel() == logging::LOG_LEVEL::err);
   CHECK(proc->isAutoTerminated({"one", ""}));
   CHECK(proc->isAutoTerminated({"two", ""}));
   CHECK_FALSE(proc->isAutoTerminated({"three", ""}));
diff --git 
a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp 
b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
index 4e2a31485..72746076d 100644
--- a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
+++ b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
@@ -102,6 +102,7 @@ Processors:
       scheduling period: 1 sec
       penalization period: 30 sec
       yield period: 1 sec
+      bulletin level: ERROR
       run duration nanos: 0
       auto-terminated relationships list:
       Properties:
@@ -159,6 +160,7 @@ Provenance Reporting:
     REQUIRE(1s == 
rootFlowConfig->findProcessorByName("TailFile")->getSchedulingPeriod());
     REQUIRE(30s == 
rootFlowConfig->findProcessorByName("TailFile")->getPenalizationPeriod());
     REQUIRE(1s == 
rootFlowConfig->findProcessorByName("TailFile")->getYieldPeriod());
+    
REQUIRE(rootFlowConfig->findProcessorByName("TailFile")->getLogBulletinLevel() 
== logging::LOG_LEVEL::err);
     REQUIRE(0s == 
rootFlowConfig->findProcessorByName("TailFile")->getRunDurationNano());
 
     std::map<std::string, minifi::Connection*> connectionMap;
diff --git a/libminifi/include/core/BulletinStore.h 
b/libminifi/include/core/BulletinStore.h
new file mode 100644
index 000000000..372601454
--- /dev/null
+++ b/libminifi/include/core/BulletinStore.h
@@ -0,0 +1,70 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include <deque>
+#include <mutex>
+#include <memory>
+#include <optional>
+#include <chrono>
+
+#include "properties/Configure.h"
+#include "core/logging/LoggerFactory.h"
+#include "core/Processor.h"
+
+namespace org::apache::nifi::minifi {
+namespace test {
+class BulletinStoreTestAccessor;
+}  // namespace test
+
+namespace core {
+
+struct Bulletin {
+  uint64_t id = 0;
+  std::chrono::time_point<std::chrono::system_clock> timestamp;
+  std::string level;
+  std::string category;
+  std::string message;
+  std::string group_id;
+  std::string group_name;
+  std::string group_path;
+  std::string source_id;
+  std::string source_name;
+};
+
+class BulletinStore {
+ public:
+  explicit BulletinStore(const Configure& configure);
+  void addProcessorBulletin(const core::Processor& processor, 
core::logging::LOG_LEVEL log_level, const std::string& message);
+  std::deque<Bulletin> 
getBulletins(std::optional<std::chrono::system_clock::duration> 
time_interval_to_include = {}) const;
+  size_t getMaxBulletinCount() const;
+
+ private:
+  friend class minifi::test::BulletinStoreTestAccessor;
+
+  static constexpr size_t DEFAULT_BULLETIN_COUNT = 1000;
+  size_t max_bulletin_count_;
+  mutable std::mutex mutex_;
+  uint64_t id_counter = 1;
+  std::deque<Bulletin> bulletins_;
+  std::shared_ptr<logging::Logger> 
logger_{logging::LoggerFactory<BulletinStore>::getLogger()};
+};
+
+}  // namespace core
+}  // namespace org::apache::nifi::minifi
diff --git a/libminifi/include/core/FlowConfiguration.h 
b/libminifi/include/core/FlowConfiguration.h
index 331856397..1145601f2 100644
--- a/libminifi/include/core/FlowConfiguration.h
+++ b/libminifi/include/core/FlowConfiguration.h
@@ -44,6 +44,7 @@
 #include "utils/ChecksumCalculator.h"
 #include "ParameterContext.h"
 #include "ParameterProvider.h"
+#include "core/BulletinStore.h"
 
 namespace org::apache::nifi::minifi::core {
 
@@ -63,6 +64,7 @@ struct ConfigurationContext {
   std::shared_ptr<utils::file::FileSystem> 
filesystem{std::make_shared<utils::file::FileSystem>()};
   std::optional<utils::crypto::EncryptionProvider> 
sensitive_values_encryptor{std::nullopt};
   utils::file::AssetManager* asset_manager{nullptr};
+  core::BulletinStore* bulletin_store{nullptr};
 };
 
 enum class FlowSerializationType { Json, NifiJson, Yaml };
@@ -155,6 +157,7 @@ class FlowConfiguration : public CoreComponentImpl {
   utils::crypto::EncryptionProvider sensitive_values_encryptor_;
   utils::ChecksumCalculator checksum_calculator_;
   utils::file::AssetManager* asset_manager_{nullptr};
+  core::BulletinStore* bulletin_store_{nullptr};
 
  private:
   virtual std::string serialize(const ProcessGroup&) { return ""; }
diff --git a/libminifi/include/core/ProcessGroup.h 
b/libminifi/include/core/ProcessGroup.h
index bb3b018b7..1a267b04e 100644
--- a/libminifi/include/core/ProcessGroup.h
+++ b/libminifi/include/core/ProcessGroup.h
@@ -73,17 +73,13 @@ class ProcessGroup : public CoreComponentImpl {
   ProcessGroup(ProcessGroupType type, std::string_view name);
   ProcessGroup(ProcessGroupType type, std::string_view name, const 
utils::Identifier& uuid);
   ProcessGroup(ProcessGroupType type, std::string_view name, const 
utils::Identifier& uuid, int version);
-  // Destructor
   ~ProcessGroup() override;
-  // Set URL
   void setURL(std::string url) {
     url_ = std::move(url);
   }
-  // Get URL
   std::string getURL() {
     return (url_);
   }
-  // SetTransmitting
   void setTransmitting(bool val) {
     transmitting_ = val;
   }
@@ -93,7 +89,6 @@ class ProcessGroup : public CoreComponentImpl {
   uint64_t getTimeout() {
     return timeout_;
   }
-  // setInterface
   void setInterface(std::string &ifc) {
     local_network_interface_ = ifc;
   }
@@ -124,11 +119,9 @@ class ProcessGroup : public CoreComponentImpl {
   http::HTTPProxy getHTTPProxy() {
     return proxy_;
   }
-  // Set Processor yield period in MilliSecond
   void setYieldPeriodMsec(std::chrono::milliseconds period) {
     yield_period_msec_ = period;
   }
-  // Get Processor yield period in MilliSecond
   std::chrono::milliseconds getYieldPeriodMsec() {
     return (yield_period_msec_);
   }
@@ -147,13 +140,11 @@ class ProcessGroup : public CoreComponentImpl {
                       const std::function<bool(const Processor*)>& filter = 
nullptr);
 
   bool isRemoteProcessGroup();
-  // set parent process group
   void setParent(ProcessGroup *parent) {
     std::lock_guard<std::recursive_mutex> lock(mutex_);
     parent_process_group_ = parent;
   }
-  // get parent process group
-  ProcessGroup *getParent() {
+  ProcessGroup *getParent() const {
     std::lock_guard<std::recursive_mutex> lock(mutex_);
     return parent_process_group_;
   }
@@ -185,25 +176,17 @@ class ProcessGroup : public CoreComponentImpl {
     }
     return nullptr;
   }
-  // findProcessor based on UUID
   Processor* findProcessorById(const utils::Identifier& uuid, Traverse 
traverse = Traverse::IncludeChildren) const;
-  // findProcessor based on name
   Processor* findProcessorByName(const std::string &processorName, Traverse 
traverse = Traverse::IncludeChildren) const;
 
   void getAllProcessors(std::vector<Processor*>& processor_vec) const;
 
-  /**
-   * Add controller service
-   * @param nodeId node identifier
-   * @param node controller service node.
-   */
   void addControllerService(const std::string &nodeId, const 
std::shared_ptr<core::controller::ControllerServiceNode> &node);
 
   core::controller::ControllerServiceNode* findControllerService(const 
std::string &nodeId, Traverse traverse = Traverse::ExcludeChildren) const;
 
   std::vector<const core::controller::ControllerServiceNode*> 
getAllControllerServices() const;
 
-  // update property value
   void updatePropertyValue(const std::string& processorName, const 
std::string& propertyName, const std::string& propertyValue);
 
   void getConnections(std::map<std::string, Connection*>& connectionMap);
@@ -228,45 +211,29 @@ class ProcessGroup : public CoreComponentImpl {
  protected:
   void startProcessingProcessors(TimerDrivenSchedulingAgent& timeScheduler, 
EventDrivenSchedulingAgent& eventScheduler, CronDrivenSchedulingAgent& 
cronScheduler);
 
-  // version
   int config_version_;
-  // Process Group Type
   const ProcessGroupType type_;
-  // Processors (ProcessNode) inside this process group which include Remote 
Process Group input/Output port
   std::set<std::unique_ptr<Processor>> processors_;
   std::set<Processor*> failed_processors_;
   std::set<Port*> ports_;
   std::set<std::unique_ptr<ProcessGroup>> child_process_groups_;
-  // Connections between the processor inside the group;
   std::set<std::unique_ptr<Connection>> connections_;
-  // Parent Process Group
   ProcessGroup* parent_process_group_;
-  // Yield Period in Milliseconds
   std::atomic<std::chrono::milliseconds> yield_period_msec_;
   std::atomic<uint64_t> timeout_;
-
-  // URL
   std::string url_;
-  // local network interface
   std::string local_network_interface_;
-  // Transmitting
   std::atomic<bool> transmitting_;
-  // http proxy
   http::HTTPProxy proxy_;
   std::string transport_protocol_;
-
-  // controller services
-
   core::controller::ControllerServiceNodeMap controller_service_map_;
-
   ParameterContext* parameter_context_ = nullptr;
 
  private:
   static Port* findPortById(const std::set<Port*>& ports, const 
utils::Identifier& uuid);
+  std::string buildGroupPath() const;
 
-  // Mutex for protection
   mutable std::recursive_mutex mutex_;
-  // Logger
   std::shared_ptr<logging::Logger> logger_;
 
   ProcessGroup(const ProcessGroup &parent);
diff --git a/libminifi/include/core/ProcessorConfig.h 
b/libminifi/include/core/ProcessorConfig.h
deleted file mode 100644
index e4706204c..000000000
--- a/libminifi/include/core/ProcessorConfig.h
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_CORE_PROCESSORCONFIG_H_
-#define LIBMINIFI_INCLUDE_CORE_PROCESSORCONFIG_H_
-
-#include <string>
-#include <vector>
-
-#include "core/Core.h"
-#include "core/Property.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-
-
-constexpr const char* DEFAULT_SCHEDULING_STRATEGY{"TIMER_DRIVEN"};
-constexpr const char* DEFAULT_SCHEDULING_PERIOD_STR{"1 sec"};
-constexpr std::chrono::milliseconds DEFAULT_SCHEDULING_PERIOD_MILLIS{1000};
-constexpr std::chrono::nanoseconds DEFAULT_RUN_DURATION{0};
-constexpr int DEFAULT_MAX_CONCURRENT_TASKS{1};
-constexpr std::chrono::seconds DEFAULT_YIELD_PERIOD_SECONDS{1};
-constexpr std::chrono::seconds DEFAULT_PENALIZATION_PERIOD{30};
-
-struct ProcessorConfig {
-  std::string id;
-  std::string name;
-  std::string javaClass;
-  std::string maxConcurrentTasks;
-  std::string schedulingStrategy;
-  std::string schedulingPeriod;
-  std::string penalizationPeriod;
-  std::string yieldPeriod;
-  std::string runDurationNanos;
-  std::vector<std::string> autoTerminatedRelationships;
-  std::vector<core::Property> properties;
-  std::string parameterContextName;
-};
-
-}  // namespace core
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
-
-#endif  // LIBMINIFI_INCLUDE_CORE_PROCESSORCONFIG_H_
diff --git a/libminifi/include/core/flow/FlowSchema.h 
b/libminifi/include/core/flow/FlowSchema.h
index e751d7bff..ebbb2ab26 100644
--- a/libminifi/include/core/flow/FlowSchema.h
+++ b/libminifi/include/core/flow/FlowSchema.h
@@ -35,6 +35,7 @@ struct FlowSchema {
   Keys max_concurrent_tasks;
   Keys penalization_period;
   Keys proc_yield_period;
+  Keys bulletin_level;
   Keys runduration_nanos;
 
   Keys connections;
diff --git a/libminifi/include/core/flow/StructuredConfiguration.h 
b/libminifi/include/core/flow/StructuredConfiguration.h
index 8909a351e..d452f1019 100644
--- a/libminifi/include/core/flow/StructuredConfiguration.h
+++ b/libminifi/include/core/flow/StructuredConfiguration.h
@@ -25,7 +25,7 @@
 
 #include "core/FlowConfiguration.h"
 #include "core/logging/LoggerConfiguration.h"
-#include "core/ProcessorConfig.h"
+#include "minifi-cpp/core/ProcessorConfig.h"
 #include "Exception.h"
 #include "io/validation.h"
 #include "sitetosite/SiteToSite.h"
diff --git a/libminifi/include/core/logging/LoggerBase.h 
b/libminifi/include/core/logging/LoggerBase.h
index ab2737ba2..a0132d87b 100644
--- a/libminifi/include/core/logging/LoggerBase.h
+++ b/libminifi/include/core/logging/LoggerBase.h
@@ -81,6 +81,36 @@ inline LOG_LEVEL 
mapFromSpdLogLevel(spdlog::level::level_enum level) {
   throw std::invalid_argument(fmt::format("Invalid spdlog::level::level_enum 
{}", magic_enum::enum_underlying(level)));
 }
 
+inline std::string mapLogLevelToString(LOG_LEVEL level) {
+  switch (level) {
+    case trace: return "TRACE";
+    case debug: return "DEBUG";
+    case info: return "INFO";
+    case warn: return "WARN";
+    case err: return "ERROR";
+    case critical: return "CRITICAL";
+    case off: return "OFF";
+  }
+  throw std::invalid_argument(fmt::format("Invalid LOG_LEVEL {}", 
magic_enum::enum_underlying(level)));
+}
+
+inline LOG_LEVEL mapStringToLogLevel(const std::string& level_str) {
+  if (level_str == "TRACE") {
+    return trace;
+  } else if (level_str == "DEBUG") {
+    return debug;
+  } else if (level_str == "INFO") {
+    return info;
+  } else if (level_str == "WARN") {
+    return warn;
+  } else if (level_str == "ERROR") {
+    return err;
+  } else if (level_str == "CRITICAL") {
+    return critical;
+  }
+  throw std::invalid_argument(fmt::format("Invalid LOG_LEVEL {}", level_str));
+}
+
 class LoggerBase : public Logger {
  public:
   LoggerBase(LoggerBase const&) = delete;
@@ -93,6 +123,7 @@ class LoggerBase : public Logger {
   bool should_log(LOG_LEVEL level) override;
   void log_string(LOG_LEVEL level, std::string str) override;
   LOG_LEVEL level() const override;
+  void setLogCallback(const std::function<void(LOG_LEVEL level, const 
std::string&)>& callback) override;
 
  protected:
   LoggerBase(std::shared_ptr<spdlog::logger> delegate, 
std::shared_ptr<LoggerControl> controller);
@@ -110,6 +141,7 @@ class LoggerBase : public Logger {
 
  private:
   std::atomic<int> max_log_size_{LOG_BUFFER_SIZE};
+  std::function<void(LOG_LEVEL level, const std::string&)> log_callback_;
 };
 
 }  // namespace org::apache::nifi::minifi::core::logging
diff --git a/libminifi/include/core/state/MetricsPublisherStore.h 
b/libminifi/include/core/state/MetricsPublisherStore.h
index 4be8c3504..61cad498b 100644
--- a/libminifi/include/core/state/MetricsPublisherStore.h
+++ b/libminifi/include/core/state/MetricsPublisherStore.h
@@ -28,13 +28,14 @@
 #include "utils/gsl.h"
 #include "core/ProcessGroup.h"
 #include "utils/file/AssetManager.h"
+#include "core/BulletinStore.h"
 
 namespace org::apache::nifi::minifi::state {
 
 class MetricsPublisherStore {
  public:
   MetricsPublisherStore(std::shared_ptr<Configure> configuration, const 
std::vector<std::shared_ptr<core::RepositoryMetricsSource>>& 
repository_metric_sources,
-    std::shared_ptr<core::FlowConfiguration> flow_configuration, 
utils::file::AssetManager* asset_manager = nullptr);
+    std::shared_ptr<core::FlowConfiguration> flow_configuration, 
utils::file::AssetManager* asset_manager = nullptr, core::BulletinStore* 
bulletin_store = nullptr);
   void initialize(core::controller::ControllerServiceProvider* controller, 
state::StateMonitor* update_sink);
   void loadMetricNodes(core::ProcessGroup* root);
   void clearMetricNodes();
diff --git a/libminifi/include/core/state/nodes/FlowInformation.h 
b/libminifi/include/core/state/nodes/FlowInformation.h
index 7ba9a4931..e0f69b2ca 100644
--- a/libminifi/include/core/state/nodes/FlowInformation.h
+++ b/libminifi/include/core/state/nodes/FlowInformation.h
@@ -27,6 +27,7 @@
 #include "Connection.h"
 #include "core/state/ConnectionStore.h"
 #include "core/Processor.h"
+#include "core/BulletinStore.h"
 
 namespace org::apache::nifi::minifi::state::response {
 
@@ -121,6 +122,10 @@ class FlowInformation : public StateMonitorNode {
     processors_ = std::move(processors);
   }
 
+  void setBulletinStore(core::BulletinStore* bulletin_store) {
+    bulletin_store_ = bulletin_store;
+  }
+
   std::vector<SerializedResponseNode> serialize() override;
   std::vector<PublishedMetric> calculateMetrics() override;
 
@@ -128,6 +133,7 @@ class FlowInformation : public StateMonitorNode {
   std::shared_ptr<state::response::FlowVersion> flow_version_;
   ConnectionStore connection_store_;
   std::vector<core::Processor*> processors_;
+  core::BulletinStore* bulletin_store_ = nullptr;
 };
 
 }  // namespace org::apache::nifi::minifi::state::response
diff --git a/libminifi/include/core/state/nodes/ResponseNodeLoader.h 
b/libminifi/include/core/state/nodes/ResponseNodeLoader.h
index 69fc73a7b..018bc5e08 100644
--- a/libminifi/include/core/state/nodes/ResponseNodeLoader.h
+++ b/libminifi/include/core/state/nodes/ResponseNodeLoader.h
@@ -36,13 +36,14 @@
 #include "core/RepositoryMetricsSource.h"
 #include "utils/file/AssetManager.h"
 #include "minifi-cpp/core/state/nodes/ResponseNodeLoader.h"
+#include "core/BulletinStore.h"
 
 namespace org::apache::nifi::minifi::state::response {
 
 class ResponseNodeLoaderImpl : public ResponseNodeLoader {
  public:
   ResponseNodeLoaderImpl(std::shared_ptr<Configure> configuration, 
std::vector<std::shared_ptr<core::RepositoryMetricsSource>> 
repository_metric_sources,
-    std::shared_ptr<core::FlowConfiguration> flow_configuration, 
utils::file::AssetManager* asset_manager = nullptr);
+    std::shared_ptr<core::FlowConfiguration> flow_configuration, 
utils::file::AssetManager* asset_manager = nullptr, core::BulletinStore* 
bulletin_store = nullptr);
 
   void setNewConfigRoot(core::ProcessGroup* root) override;
   void clearConfigRoot() override;
@@ -81,6 +82,7 @@ class ResponseNodeLoaderImpl : public ResponseNodeLoader {
   utils::file::AssetManager* asset_manager_{};
   core::controller::ControllerServiceProvider* controller_{};
   state::StateMonitor* update_sink_{};
+  core::BulletinStore* bulletin_store_{};
   std::shared_ptr<core::logging::Logger> 
logger_{core::logging::LoggerFactory<ResponseNodeLoader>::getLogger()};
 };
 
diff --git a/libminifi/include/core/state/nodes/SchedulingNodes.h 
b/libminifi/include/core/state/nodes/SchedulingNodes.h
index 7321b8467..31ccf50de 100644
--- a/libminifi/include/core/state/nodes/SchedulingNodes.h
+++ b/libminifi/include/core/state/nodes/SchedulingNodes.h
@@ -22,7 +22,7 @@
 #include <vector>
 
 #include "MetricsBase.h"
-#include "core/ProcessorConfig.h"
+#include "minifi-cpp/core/ProcessorConfig.h"
 
 namespace org::apache::nifi::minifi::state::response {
 
diff --git a/libminifi/include/core/yaml/YamlConfiguration.h 
b/libminifi/include/core/yaml/YamlConfiguration.h
index 3eee7acbd..790b590de 100644
--- a/libminifi/include/core/yaml/YamlConfiguration.h
+++ b/libminifi/include/core/yaml/YamlConfiguration.h
@@ -25,7 +25,7 @@
 
 #include "core/FlowConfiguration.h"
 #include "core/logging/LoggerFactory.h"
-#include "core/ProcessorConfig.h"
+#include "minifi-cpp/core/ProcessorConfig.h"
 #include "Exception.h"
 #include "io/validation.h"
 #include "sitetosite/SiteToSite.h"
diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp
index faee19047..d16401719 100644
--- a/libminifi/src/Configuration.cpp
+++ b/libminifi/src/Configuration.cpp
@@ -99,6 +99,7 @@ const std::unordered_map<std::string_view, 
gsl::not_null<const core::PropertyVal
   {Configuration::nifi_c2_rest_ssl_context_service, 
gsl::make_not_null(&core::StandardPropertyValidators::ALWAYS_VALID_VALIDATOR)},
   {Configuration::nifi_c2_rest_request_encoding, 
gsl::make_not_null(&core::StandardPropertyValidators::ALWAYS_VALID_VALIDATOR)},
   {Configuration::nifi_c2_rest_heartbeat_minimize_updates, 
gsl::make_not_null(&core::StandardPropertyValidators::BOOLEAN_VALIDATOR)},
+  {Configuration::nifi_c2_flow_info_processor_bulletin_limit, 
gsl::make_not_null(&core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR)},
   {Configuration::nifi_state_storage_local, 
gsl::make_not_null(&core::StandardPropertyValidators::ALWAYS_VALID_VALIDATOR)},
   {Configuration::nifi_state_storage_local_old, 
gsl::make_not_null(&core::StandardPropertyValidators::ALWAYS_VALID_VALIDATOR)},
   {Configuration::nifi_state_storage_local_class_name, 
gsl::make_not_null(&core::StandardPropertyValidators::ALWAYS_VALID_VALIDATOR)},
diff --git a/libminifi/src/core/BulletinStore.cpp 
b/libminifi/src/core/BulletinStore.cpp
new file mode 100644
index 000000000..e49b67c9a
--- /dev/null
+++ b/libminifi/src/core/BulletinStore.cpp
@@ -0,0 +1,80 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "core/BulletinStore.h"
+
+#include "core/logging/LoggerBase.h"
+
+namespace org::apache::nifi::minifi::core {
+
+BulletinStore::BulletinStore(const Configure &configure) {
+  auto max_bulletin_count_str = 
configure.get(Configuration::nifi_c2_flow_info_processor_bulletin_limit);
+  if (!max_bulletin_count_str) {
+    logger_->log_debug("Bulletin limit not set, using default value of {}", 
DEFAULT_BULLETIN_COUNT);
+    max_bulletin_count_ = DEFAULT_BULLETIN_COUNT;
+    return;
+  }
+  try {
+    max_bulletin_count_ = std::stoul(*max_bulletin_count_str);
+  } catch(const std::exception&) {
+    logger_->log_warn("Invalid value for bulletin limit, using default value 
of {}", DEFAULT_BULLETIN_COUNT);
+    max_bulletin_count_ = DEFAULT_BULLETIN_COUNT;
+  }
+}
+
+void BulletinStore::addProcessorBulletin(const core::Processor& processor, 
core::logging::LOG_LEVEL log_level, const std::string& message) {
+  std::lock_guard<std::mutex> lock(mutex_);
+  Bulletin bulletin;
+  bulletin.id = id_counter++;
+  bulletin.timestamp = std::chrono::system_clock::now();
+  bulletin.level = core::logging::mapLogLevelToString(log_level);
+  bulletin.category = "Log Message";
+  bulletin.message = message;
+  bulletin.group_id = processor.getProcessGroupUUIDStr();
+  bulletin.group_name = processor.getProcessGroupName();
+  bulletin.group_path = processor.getProcessGroupPath();
+  bulletin.source_id = processor.getUUIDStr();
+  bulletin.source_name = processor.getName();
+  if (bulletins_.size() >= max_bulletin_count_) {
+    bulletins_.pop_front();
+  }
+  bulletins_.push_back(std::move(bulletin));
+}
+
+std::deque<Bulletin> 
BulletinStore::getBulletins(std::optional<std::chrono::system_clock::duration> 
time_interval_to_include) const {
+  std::lock_guard<std::mutex> lock(mutex_);
+  if (!time_interval_to_include) {
+    return bulletins_;
+  }
+
+  const auto timestamp_cutoff = std::chrono::system_clock::now() - 
*time_interval_to_include;
+  auto it = std::lower_bound(bulletins_.begin(), bulletins_.end(), 
timestamp_cutoff,
+    [](const auto& bulletin, const auto& timestamp) {
+      return bulletin.timestamp < timestamp;
+    });
+  if (it != bulletins_.end()) {
+    return {it, bulletins_.end()};
+  }
+
+  return {};
+}
+
+size_t BulletinStore::getMaxBulletinCount() const {
+  return max_bulletin_count_;
+}
+
+}  // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/src/core/FlowConfiguration.cpp 
b/libminifi/src/core/FlowConfiguration.cpp
index effcbb872..840114c4d 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -38,6 +38,7 @@ FlowConfiguration::FlowConfiguration(ConfigurationContext ctx)
       filesystem_(std::move(ctx.filesystem)),
       
sensitive_values_encryptor_(std::move(ctx.sensitive_values_encryptor.value())),
       asset_manager_(ctx.asset_manager),
+      bulletin_store_(ctx.bulletin_store),
       logger_(logging::LoggerFactory<FlowConfiguration>::getLogger()) {
   std::string flowUrl;
   std::string bucket_id = "default";
diff --git a/libminifi/src/core/ProcessGroup.cpp 
b/libminifi/src/core/ProcessGroup.cpp
index 4f274575d..df183661c 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -87,6 +87,8 @@ std::tuple<Processor*, bool> 
ProcessGroup::addProcessor(std::unique_ptr<Processo
   const auto name = processor->getName();
   std::lock_guard<std::recursive_mutex> lock(mutex_);
   processor->setProcessGroupUUIDStr(getUUIDStr());
+  processor->setProcessGroupName(getName());
+  processor->setProcessGroupPath(buildGroupPath());
   const auto [iter, inserted] = processors_.insert(std::move(processor));
   if (inserted) {
     logger_->log_debug("Add processor {} into process group {}", name, name_);
@@ -485,4 +487,15 @@ ParameterContext* ProcessGroup::getParameterContext() 
const {
   return parameter_context_;
 }
 
+std::string ProcessGroup::buildGroupPath() const {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  std::string path = name_;
+  auto parent = parent_process_group_;
+  while (parent != nullptr) {
+    path.insert(0, parent->getName() + " / ");
+    parent = parent->getParent();
+  }
+  return path;
+}
+
 }  // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/src/core/flow/FlowSchema.cpp 
b/libminifi/src/core/flow/FlowSchema.cpp
index 812b578f0..7d80603d9 100644
--- a/libminifi/src/core/flow/FlowSchema.cpp
+++ b/libminifi/src/core/flow/FlowSchema.cpp
@@ -31,6 +31,7 @@ FlowSchema FlowSchema::getDefault() {
       .max_concurrent_tasks = {"max concurrent tasks"},
       .penalization_period = {"penalization period"},
       .proc_yield_period = {"yield period"},
+      .bulletin_level = {"bulletin level"},
       .runduration_nanos = {"run duration nanos"},
 
       .connections = {"Connections"},
@@ -101,6 +102,7 @@ FlowSchema FlowSchema::getNiFiFlowJson() {
       .max_concurrent_tasks = {"concurrentlySchedulableTaskCount"},
       .penalization_period = {"penaltyDuration"},
       .proc_yield_period = {"yieldDuration"},
+      .bulletin_level = {"bulletinLevel"},
       // TODO(adebreceni): MINIFICPP-2033 since this is unused the mismatch 
between nano and milli is not an issue
       .runduration_nanos = {"runDurationMillis"},
 
diff --git a/libminifi/src/core/flow/StructuredConfiguration.cpp 
b/libminifi/src/core/flow/StructuredConfiguration.cpp
index c12d6e0dc..e056c595e 100644
--- a/libminifi/src/core/flow/StructuredConfiguration.cpp
+++ b/libminifi/src/core/flow/StructuredConfiguration.cpp
@@ -359,6 +359,11 @@ void StructuredConfiguration::parseProcessorNode(const 
Node& processors_node, co
       logger_->log_debug("parseProcessorNode: yield period => [{}]", 
procCfg.yieldPeriod);
     }
 
+    if (auto bulletin_level_node = procNode[schema_.bulletin_level]) {
+      procCfg.bulletinLevel = bulletin_level_node.getString().value();
+      logger_->log_debug("parseProcessorNode: bulletin level => [{}]", 
procCfg.bulletinLevel);
+    }
+
     if (auto runNode = procNode[schema_.runduration_nanos]) {
       procCfg.runDurationNanos = runNode.getIntegerAsString().value();
       logger_->log_debug("parseProcessorNode: run duration nanos => [{}]", 
procCfg.runDurationNanos);
@@ -401,6 +406,18 @@ void StructuredConfiguration::parseProcessorNode(const 
Node& processors_node, co
       processor->setYieldPeriodMsec(yield_period.value());
     }
 
+    if (!procCfg.bulletinLevel.empty()) {
+      
processor->setLogBulletinLevel(core::logging::mapStringToLogLevel(procCfg.bulletinLevel));
+    }
+    processor->setLoggerCallback([this, processor = 
processor.get()](core::logging::LOG_LEVEL level, const std::string& message) {
+      if (level < processor->getLogBulletinLevel()) {
+        return;
+      }
+      if (bulletin_store_) {
+        bulletin_store_->addProcessorBulletin(*processor, level, message);
+      }
+    });
+
     // Default to running
     processor->setScheduledState(core::RUNNING);
 
diff --git a/libminifi/src/core/logging/LoggerBase.cpp 
b/libminifi/src/core/logging/LoggerBase.cpp
index f4415a891..e90149b43 100644
--- a/libminifi/src/core/logging/LoggerBase.cpp
+++ b/libminifi/src/core/logging/LoggerBase.cpp
@@ -36,6 +36,10 @@ void LoggerControl::setEnabled(bool status) {
   is_enabled_ = status;
 }
 
+void LoggerBase::setLogCallback(const std::function<void(LOG_LEVEL level, 
const std::string&)>& callback) {
+  std::lock_guard<std::mutex> lock(mutex_);
+  log_callback_ = callback;
+}
 
 bool LoggerBase::should_log(LOG_LEVEL level) {
   if (controller_ && !controller_->is_enabled())
@@ -46,6 +50,9 @@ bool LoggerBase::should_log(LOG_LEVEL level) {
 }
 
 void LoggerBase::log_string(LOG_LEVEL level, std::string str) {
+  if (log_callback_) {
+    log_callback_(level, str);
+  }
   delegate_->log(mapToSpdLogLevel(level), str.c_str());
 }
 
diff --git a/libminifi/src/core/state/MetricsPublisherStore.cpp 
b/libminifi/src/core/state/MetricsPublisherStore.cpp
index 85fddb39a..6c89e58c1 100644
--- a/libminifi/src/core/state/MetricsPublisherStore.cpp
+++ b/libminifi/src/core/state/MetricsPublisherStore.cpp
@@ -23,9 +23,9 @@
 namespace org::apache::nifi::minifi::state {
 
 MetricsPublisherStore::MetricsPublisherStore(std::shared_ptr<Configure> 
configuration, const 
std::vector<std::shared_ptr<core::RepositoryMetricsSource>>& 
repository_metric_sources,
-  std::shared_ptr<core::FlowConfiguration> flow_configuration, 
utils::file::AssetManager* asset_manager)
+  std::shared_ptr<core::FlowConfiguration> flow_configuration, 
utils::file::AssetManager* asset_manager, core::BulletinStore* bulletin_store)
     : configuration_(configuration),
-      
response_node_loader_(std::make_shared<response::ResponseNodeLoaderImpl>(std::move(configuration),
 repository_metric_sources, std::move(flow_configuration), asset_manager)) {
+      
response_node_loader_(std::make_shared<response::ResponseNodeLoaderImpl>(std::move(configuration),
 repository_metric_sources, std::move(flow_configuration), asset_manager, 
bulletin_store)) {
 }
 
 void 
MetricsPublisherStore::initialize(core::controller::ControllerServiceProvider* 
controller, state::StateMonitor* update_sink) {
diff --git a/libminifi/src/core/state/nodes/FlowInformation.cpp 
b/libminifi/src/core/state/nodes/FlowInformation.cpp
index bbe9fc451..56584156b 100644
--- a/libminifi/src/core/state/nodes/FlowInformation.cpp
+++ b/libminifi/src/core/state/nodes/FlowInformation.cpp
@@ -18,6 +18,9 @@
 #include "core/state/nodes/FlowInformation.h"
 #include "core/Resource.h"
 #include "core/state/Value.h"
+#include "utils/TimeUtil.h"
+
+using namespace std::literals::chrono_literals;
 
 namespace org::apache::nifi::minifi::state::response {
 
@@ -99,6 +102,30 @@ std::vector<SerializedResponseNode> 
FlowInformation::serialize() {
     serialized.push_back(processorsStatusesNode);
   }
 
+  if (bulletin_store_) {
+    SerializedResponseNode processorBulletinsNode{.name = 
"processorBulletins", .array = true, .collapsible = false};
+    auto bulletins = bulletin_store_->getBulletins(5min);
+    for (const auto& bulletin : bulletins) {
+      processorBulletinsNode.children.push_back({
+        .name = std::to_string(bulletin.id),
+        .collapsible = false,
+        .children = {
+          {.name = "id", .value = bulletin.id},
+          {.name = "timestamp", .value = 
utils::timeutils::getNiFiDateTimeFormat(std::chrono::time_point_cast<std::chrono::seconds>(bulletin.timestamp))},
+          {.name = "level", .value = bulletin.level},
+          {.name = "category", .value = bulletin.category},
+          {.name = "message", .value = bulletin.message},
+          {.name = "groupId", .value = bulletin.group_id},
+          {.name = "groupName", .value = bulletin.group_name},
+          {.name = "groupPath", .value = bulletin.group_path},
+          {.name = "sourceId", .value = bulletin.source_id},
+          {.name = "sourceName", .value = bulletin.source_name}
+        }
+      });
+    }
+    serialized.push_back(processorBulletinsNode);
+  }
+
   return serialized;
 }
 
diff --git a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp 
b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
index ef8d6e867..257eab3df 100644
--- a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
+++ b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
@@ -34,11 +34,12 @@
 namespace org::apache::nifi::minifi::state::response {
 
 ResponseNodeLoaderImpl::ResponseNodeLoaderImpl(std::shared_ptr<Configure> 
configuration, std::vector<std::shared_ptr<core::RepositoryMetricsSource>> 
repository_metric_sources,
-  std::shared_ptr<core::FlowConfiguration> flow_configuration, 
utils::file::AssetManager* asset_manager)
+  std::shared_ptr<core::FlowConfiguration> flow_configuration, 
utils::file::AssetManager* asset_manager, core::BulletinStore* bulletin_store)
     : configuration_(std::move(configuration)),
       repository_metric_sources_(std::move(repository_metric_sources)),
       flow_configuration_(std::move(flow_configuration)),
-      asset_manager_(asset_manager) {
+      asset_manager_(asset_manager),
+      bulletin_store_(bulletin_store) {
 }
 
 void ResponseNodeLoaderImpl::clearConfigRoot() {
@@ -233,6 +234,10 @@ void 
ResponseNodeLoaderImpl::initializeFlowInformation(const SharedResponseNode&
     flow_information->setFlowVersion(flow_configuration_->getFlowVersion());
   }
 
+  if (bulletin_store_) {
+    flow_information->setBulletinStore(bulletin_store_);
+  }
+
   if (root_) {
     std::vector<core::Processor*> processors;
     root_->getAllProcessors(processors);
diff --git a/libminifi/test/integration/C2MetricsTest.cpp 
b/libminifi/test/integration/C2MetricsTest.cpp
index 1aad28e5e..fb51805b4 100644
--- a/libminifi/test/integration/C2MetricsTest.cpp
+++ b/libminifi/test/integration/C2MetricsTest.cpp
@@ -45,7 +45,7 @@ class VerifyC2Metrics : public VerifyC2Base {
     LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
     LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
     LogTestController::getInstance().setDebug<minifi::FlowController>();
-    LogTestController::getInstance().setOff<minifi::processors::GetTCP>();
+    LogTestController::getInstance().setDebug<minifi::processors::GetTCP>();
     VerifyC2Base::testSetup();
   }
 
@@ -93,6 +93,7 @@ class MetricsHandler: public HeartbeatHandler {
     VERIFY_UPDATED_METRICS
   };
 
+  static constexpr const char* PROCESS_GROUP_UUID = 
"2438e3c8-015a-1000-79ca-83af40ec1990";
   static constexpr const char* GETTCP_UUID = 
"2438e3c8-015a-1000-79ca-83af40ec1991";
   static constexpr const char* LOGATTRIBUTE1_UUID = 
"2438e3c8-015a-1000-79ca-83af40ec1992";
   static constexpr const char* LOGATTRIBUTE2_UUID = 
"5128e3c8-015a-1000-79ca-83af40ec1990";
@@ -160,6 +161,27 @@ class MetricsHandler: public HeartbeatHandler {
       runtime_metrics["flowInfo"].HasMember("processorStatuses");
   }
 
+  static bool verifyProcessorBulletins(const rapidjson::Value& 
runtime_metrics) {
+    if (!runtime_metrics["flowInfo"].HasMember("processorBulletins")) {
+      return false;
+    }
+    auto bulletins = 
runtime_metrics["flowInfo"]["processorBulletins"].GetArray();
+    return std::any_of(bulletins.begin(), bulletins.end(), [](const auto& 
bulletin) {
+      std::string message = bulletin["message"].GetString();
+      return bulletin["id"].GetInt() > 0 &&
+        !std::string{bulletin["timestamp"].GetString()}.empty() &&
+        bulletin["level"].GetString() == std::string("ERROR") &&
+        bulletin["category"].GetString() == std::string("Log Message") &&
+        message.find("Error connecting to") != std::string::npos &&
+        message.find(GETTCP_UUID) != std::string::npos &&
+        bulletin["groupId"].GetString() == std::string(PROCESS_GROUP_UUID) &&
+        bulletin["groupName"].GetString() == std::string("MiNiFi Flow") &&
+        bulletin["groupPath"].GetString() == std::string("MiNiFi Flow") &&
+        bulletin["sourceId"].GetString() == std::string(GETTCP_UUID) &&
+        bulletin["sourceName"].GetString() == std::string("GetTCP");
+    });
+  }
+
   static bool verifyRuntimeMetrics(const rapidjson::Value& runtime_metrics) {
     return verifyCommonRuntimeMetricNodes(runtime_metrics, 
"2438e3c8-015a-1000-79ca-83af40ec1997") &&
       [&]() {
@@ -173,7 +195,8 @@ class MetricsHandler: public HeartbeatHandler {
           }
           return processorMetricsAreValid(processor);
         });
-      }();
+      }() &&
+      verifyProcessorBulletins(runtime_metrics);
   }
 
   static bool verifyUpdatedRuntimeMetrics(const rapidjson::Value& 
runtime_metrics) {
diff --git a/libminifi/test/libtest/integration/IntegrationBase.cpp 
b/libminifi/test/libtest/integration/IntegrationBase.cpp
index 3652e0ccb..a7facb26d 100644
--- a/libminifi/test/libtest/integration/IntegrationBase.cpp
+++ b/libminifi/test/libtest/integration/IntegrationBase.cpp
@@ -91,6 +91,7 @@ void IntegrationBase::run(const 
std::optional<std::filesystem::path>& test_file_
     std::string nifi_configuration_class_name = "adaptiveconfiguration";
     configuration->get(minifi::Configure::nifi_configuration_class_name, 
nifi_configuration_class_name);
 
+    bulletin_store_ = std::make_unique<core::BulletinStore>(*configuration);
     std::shared_ptr<core::FlowConfiguration> flow_config = 
core::createFlowConfiguration(
         core::ConfigurationContext{
             .flow_file_repo = test_repo,
@@ -98,7 +99,8 @@ void IntegrationBase::run(const 
std::optional<std::filesystem::path>& test_file_
             .configuration = configuration,
             .path = test_file_location,
             .filesystem = filesystem,
-            .sensitive_values_encryptor = sensitive_values_encryptor
+            .sensitive_values_encryptor = sensitive_values_encryptor,
+            .bulletin_store = bulletin_store_.get()
         }, nifi_configuration_class_name);
 
     auto controller_service_provider = 
flow_config->getControllerServiceProvider();
@@ -119,7 +121,7 @@ void IntegrationBase::run(const 
std::optional<std::filesystem::path>& test_file_
 
     std::vector<std::shared_ptr<core::RepositoryMetricsSource>> 
repo_metric_sources{test_repo, test_flow_repo, content_repo};
     asset_manager_ = 
std::make_unique<minifi::utils::file::AssetManager>(*configuration);
-    auto metrics_publisher_store = 
std::make_unique<minifi::state::MetricsPublisherStore>(configuration, 
repo_metric_sources, flow_config, asset_manager_.get());
+    auto metrics_publisher_store = 
std::make_unique<minifi::state::MetricsPublisherStore>(configuration, 
repo_metric_sources, flow_config, asset_manager_.get(), bulletin_store_.get());
     flowController_ = std::make_unique<minifi::FlowController>(test_repo, 
test_flow_repo, configuration,
       std::move(flow_config), content_repo, 
std::move(metrics_publisher_store), filesystem, request_restart, 
asset_manager_.get());
     flowController_->load();
diff --git a/libminifi/test/libtest/integration/IntegrationBase.h 
b/libminifi/test/libtest/integration/IntegrationBase.h
index 76c518606..002820486 100644
--- a/libminifi/test/libtest/integration/IntegrationBase.h
+++ b/libminifi/test/libtest/integration/IntegrationBase.h
@@ -30,6 +30,7 @@
 #include "properties/Configure.h"
 #include "utils/file/AssetManager.h"
 #include "utils/file/FileUtils.h"
+#include "core/BulletinStore.h"
 
 namespace minifi = org::apache::nifi::minifi;
 namespace core = minifi::core;
@@ -110,6 +111,7 @@ class IntegrationBase {
   void configureSecurity();
   std::shared_ptr<minifi::Configure> configuration;
   std::unique_ptr<minifi::utils::file::AssetManager> asset_manager_;
+  std::unique_ptr<core::BulletinStore> bulletin_store_;
   std::unique_ptr<minifi::state::response::ResponseNodeLoader> 
response_node_loader_;
   std::unique_ptr<minifi::FlowController> flowController_;
   std::chrono::milliseconds wait_time_;
diff --git a/libminifi/test/resources/TestC2Metrics.yml 
b/libminifi/test/resources/TestC2Metrics.yml
index 6a0af5e4c..fdb892419 100644
--- a/libminifi/test/resources/TestC2Metrics.yml
+++ b/libminifi/test/resources/TestC2Metrics.yml
@@ -29,6 +29,7 @@ Processors:
       penalization period: 30 sec
       yield period: 10 sec
       run duration nanos: 0
+      bulletin level: ERROR
       auto-terminated relationships list:
       Properties:
           Endpoint List: localhost:8776
@@ -43,6 +44,7 @@ Processors:
       scheduling period: 30 sec
       penalization period: 30 sec
       yield period: 1 sec
+      bulletin level: ERROR
       run duration nanos: 0
       auto-terminated relationships list:
         - response
diff --git a/libminifi/test/unit/BulletinStoreTests.cpp 
b/libminifi/test/unit/BulletinStoreTests.cpp
new file mode 100644
index 000000000..deb811725
--- /dev/null
+++ b/libminifi/test/unit/BulletinStoreTests.cpp
@@ -0,0 +1,129 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <memory>
+
+#include "unit/TestBase.h"
+#include "unit/Catch.h"
+#include "core/BulletinStore.h"
+#include "properties/Configure.h"
+#include "unit/DummyProcessor.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::test {
+
+class BulletinStoreTestAccessor {
+ public:
+  static std::deque<core::Bulletin>& getBulletins(core::BulletinStore& store) {
+    return store.bulletins_;
+  }
+};
+
+std::unique_ptr<core::Processor> createDummyProcessor() {
+  auto processor = std::make_unique<DummyProcessor>("DummyProcessor", 
minifi::utils::Identifier::parse("4d7fa7e6-2459-46dd-b2ba-61517239edf5").value());
+  processor->setProcessGroupUUIDStr("68fa9ae4-b9fc-4873-b0d9-edab59fdb0c2");
+  processor->setProcessGroupName("sub_group");
+  processor->setProcessGroupPath("root / sub_group");
+  return processor;
+}
+
+TEST_CASE("Create BulletinStore with default max size of 1000", 
"[bulletinStore]") {
+  ConfigureImpl configuration;
+  SECTION("No limit is configured") {}
+  SECTION("Invalid value is configured") {
+    configuration.set(Configure::nifi_c2_flow_info_processor_bulletin_limit, 
"invalid");
+  }
+  core::BulletinStore bulletin_store(configuration);
+  REQUIRE(bulletin_store.getMaxBulletinCount() == 1000);
+}
+
+TEST_CASE("Create BulletinStore with custom max size of 10000", 
"[bulletinStore]") {
+  ConfigureImpl configuration;
+  configuration.set(Configure::nifi_c2_flow_info_processor_bulletin_limit, 
"10000");
+  core::BulletinStore bulletin_store(configuration);
+  REQUIRE(bulletin_store.getMaxBulletinCount() == 10000);
+}
+
+TEST_CASE("Remove oldest entries when limit is reached", "[bulletinStore]") {
+  ConfigureImpl configuration;
+  configuration.set(Configure::nifi_c2_flow_info_processor_bulletin_limit, 
"2");
+  core::BulletinStore bulletin_store(configuration);
+  auto processor = createDummyProcessor();
+  for (size_t i = 0; i < 3; ++i) {
+    bulletin_store.addProcessorBulletin(*processor, logging::LOG_LEVEL::warn, 
"Warning message");
+  }
+  auto bulletins = bulletin_store.getBulletins();
+  REQUIRE(bulletins.size() == 2);
+  CHECK(bulletins[0].id == 2);
+  CHECK(bulletins[0].level == "WARN");
+  CHECK(bulletins[0].category == "Log Message");
+  CHECK(bulletins[0].message == "Warning message");
+  CHECK(bulletins[0].group_id == "68fa9ae4-b9fc-4873-b0d9-edab59fdb0c2");
+  CHECK(bulletins[0].group_name == "sub_group");
+  CHECK(bulletins[0].group_path == "root / sub_group");
+  CHECK(bulletins[0].source_id == "4d7fa7e6-2459-46dd-b2ba-61517239edf5");
+  CHECK(bulletins[0].source_name == "DummyProcessor");
+  CHECK(bulletins[1].id == 3);
+}
+
+TEST_CASE("Return all bulletins when no time interval is defined", 
"[bulletinStore]") {
+  ConfigureImpl configuration;
+  core::BulletinStore bulletin_store(configuration);
+  auto processor = createDummyProcessor();
+  for (size_t i = 0; i < 3; ++i) {
+    bulletin_store.addProcessorBulletin(*processor, logging::LOG_LEVEL::warn, 
"Warning message");
+  }
+  auto bulletins = bulletin_store.getBulletins();
+  REQUIRE(bulletins.size() == 3);
+  CHECK(bulletins[0].id == 1);
+  CHECK(bulletins[0].level == "WARN");
+  CHECK(bulletins[0].category == "Log Message");
+  CHECK(bulletins[0].message == "Warning message");
+  CHECK(bulletins[0].group_id == "68fa9ae4-b9fc-4873-b0d9-edab59fdb0c2");
+  CHECK(bulletins[0].group_name == "sub_group");
+  CHECK(bulletins[0].group_path == "root / sub_group");
+  CHECK(bulletins[0].source_id == "4d7fa7e6-2459-46dd-b2ba-61517239edf5");
+  CHECK(bulletins[0].source_name == "DummyProcessor");
+  CHECK(bulletins[1].id == 2);
+  CHECK(bulletins[2].id == 3);
+}
+
+TEST_CASE("Return only bulletins that are inside the defined time interval", 
"[bulletinStore]") {
+  ConfigureImpl configuration;
+  core::BulletinStore bulletin_store(configuration);
+  auto processor = createDummyProcessor();
+  for (size_t i = 0; i < 3; ++i) {
+    bulletin_store.addProcessorBulletin(*processor, logging::LOG_LEVEL::warn, 
"Warning message");
+  }
+  BulletinStoreTestAccessor::getBulletins(bulletin_store)[0].timestamp -= 5min;
+
+  auto bulletins = bulletin_store.getBulletins(3min);
+  REQUIRE(bulletins.size() == 2);
+  CHECK(bulletins[0].id == 2);
+  CHECK(bulletins[0].level == "WARN");
+  CHECK(bulletins[0].category == "Log Message");
+  CHECK(bulletins[0].message == "Warning message");
+  CHECK(bulletins[0].group_id == "68fa9ae4-b9fc-4873-b0d9-edab59fdb0c2");
+  CHECK(bulletins[0].group_name == "sub_group");
+  CHECK(bulletins[0].group_path == "root / sub_group");
+  CHECK(bulletins[0].source_id == "4d7fa7e6-2459-46dd-b2ba-61517239edf5");
+  CHECK(bulletins[0].source_name == "DummyProcessor");
+  CHECK(bulletins[1].id == 3);
+}
+
+}  // namespace org::apache::nifi::minifi::test
diff --git a/minifi-api/include/minifi-cpp/core/Processor.h 
b/minifi-api/include/minifi-cpp/core/Processor.h
index b732c21eb..ded0f7570 100644
--- a/minifi-api/include/minifi-cpp/core/Processor.h
+++ b/minifi-api/include/minifi-cpp/core/Processor.h
@@ -33,6 +33,7 @@
 #include "minifi-cpp/core/state/nodes/MetricsBase.h"
 #include "ProcessorMetrics.h"
 #include "utils/gsl.h"
+#include "core/logging/Logger.h"
 
 namespace org::apache::nifi::minifi {
 
@@ -88,6 +89,13 @@ class Processor : public virtual Connectable, public virtual 
ConfigurableCompone
   virtual gsl::not_null<std::shared_ptr<ProcessorMetrics>> getMetrics() const 
= 0;
   virtual std::string getProcessGroupUUIDStr() const = 0;
   virtual void setProcessGroupUUIDStr(const std::string &uuid) = 0;
+  virtual std::string getProcessGroupName() const = 0;
+  virtual void setProcessGroupName(const std::string &name) = 0;
+  virtual std::string getProcessGroupPath() const = 0;
+  virtual void setProcessGroupPath(const std::string &path) = 0;
+  virtual logging::LOG_LEVEL getLogBulletinLevel() const = 0;
+  virtual void setLogBulletinLevel(logging::LOG_LEVEL level) = 0;
+  virtual void setLoggerCallback(const std::function<void(logging::LOG_LEVEL 
level, const std::string& message)>& callback) = 0;
 
   virtual void updateReachability(const std::lock_guard<std::mutex>& 
graph_lock, bool force = false) = 0;
   virtual const std::unordered_map<Connection*, 
std::unordered_set<Processor*>>& reachable_processors() const = 0;
diff --git a/minifi-api/include/minifi-cpp/core/ProcessorConfig.h 
b/minifi-api/include/minifi-cpp/core/ProcessorConfig.h
index 0a943ed1a..56052a0fd 100644
--- a/minifi-api/include/minifi-cpp/core/ProcessorConfig.h
+++ b/minifi-api/include/minifi-cpp/core/ProcessorConfig.h
@@ -41,6 +41,7 @@ struct ProcessorConfig {
   std::string schedulingPeriod;
   std::string penalizationPeriod;
   std::string yieldPeriod;
+  std::string bulletinLevel;
   std::string runDurationNanos;
   std::vector<std::string> autoTerminatedRelationships;
   std::vector<core::Property> properties;
diff --git a/minifi-api/include/minifi-cpp/core/logging/Logger.h 
b/minifi-api/include/minifi-cpp/core/logging/Logger.h
index f94a46758..e87f630d3 100644
--- a/minifi-api/include/minifi-cpp/core/logging/Logger.h
+++ b/minifi-api/include/minifi-cpp/core/logging/Logger.h
@@ -99,6 +99,8 @@ class Logger {
 
   virtual ~Logger() = default;
 
+  virtual void setLogCallback(const std::function<void(LOG_LEVEL level, const 
std::string&)>& callback) = 0;
+
  protected:
   virtual int getMaxLogSize() = 0;
 
diff --git a/minifi-api/include/minifi-cpp/properties/Configuration.h 
b/minifi-api/include/minifi-cpp/properties/Configuration.h
index af2cdcdbe..373ea9772 100644
--- a/minifi-api/include/minifi-cpp/properties/Configuration.h
+++ b/minifi-api/include/minifi-cpp/properties/Configuration.h
@@ -127,6 +127,7 @@ class Configuration : public virtual Properties {
   static constexpr const char *nifi_c2_rest_ssl_context_service = 
"nifi.c2.rest.ssl.context.service";
   static constexpr const char *nifi_c2_rest_heartbeat_minimize_updates = 
"nifi.c2.rest.heartbeat.minimize.updates";
   static constexpr const char *nifi_c2_rest_request_encoding = 
"nifi.c2.rest.request.encoding";
+  static constexpr const char *nifi_c2_flow_info_processor_bulletin_limit = 
"nifi.c2.flow.info.processor.bulletin.limit";
 
   // state management options
   static constexpr const char *nifi_state_storage_local = 
"nifi.state.storage.local";
diff --git a/minifi_main/MiNiFiMain.cpp b/minifi_main/MiNiFiMain.cpp
index effcc6681..964419f12 100644
--- a/minifi_main/MiNiFiMain.cpp
+++ b/minifi_main/MiNiFiMain.cpp
@@ -65,11 +65,11 @@
 #include "MainHelper.h"
 #include "agent/JsonSchema.h"
 #include "core/state/nodes/ResponseNodeLoader.h"
-#include "c2/C2Agent.h"
 #include "core/state/MetricsPublisherStore.h"
 #include "argparse/argparse.hpp"
 #include "agent/agent_version.h"
 #include "Fips.h"
+#include "core/BulletinStore.h"
 
 namespace minifi = org::apache::nifi::minifi;
 namespace core = minifi::core;
@@ -395,6 +395,7 @@ int main(int argc, char **argv) {
         utils::crypto::EncryptionProvider::create(minifiHome));
 
     auto asset_manager = 
std::make_unique<utils::file::AssetManager>(*configure);
+    auto bulletin_store = std::make_unique<core::BulletinStore>(*configure);
 
     std::shared_ptr<core::FlowConfiguration> flow_configuration = 
core::createFlowConfiguration(
         core::ConfigurationContext{
@@ -404,11 +405,12 @@ int main(int argc, char **argv) {
           .path = 
configure->get(minifi::Configure::nifi_flow_configuration_file),
           .filesystem = filesystem,
           .sensitive_values_encryptor = 
utils::crypto::EncryptionProvider::createSensitivePropertiesEncryptor(minifiHome),
-          .asset_manager = asset_manager.get()
+          .asset_manager = asset_manager.get(),
+          .bulletin_store = bulletin_store.get()
       }, nifi_configuration_class_name);
 
     std::vector<std::shared_ptr<core::RepositoryMetricsSource>> 
repo_metric_sources{prov_repo, flow_repo, content_repo};
-    auto metrics_publisher_store = 
std::make_unique<minifi::state::MetricsPublisherStore>(configure, 
repo_metric_sources, flow_configuration, asset_manager.get());
+    auto metrics_publisher_store = 
std::make_unique<minifi::state::MetricsPublisherStore>(configure, 
repo_metric_sources, flow_configuration, asset_manager.get(), 
bulletin_store.get());
     const auto controller = std::make_unique<minifi::FlowController>(
         prov_repo, flow_repo, configure, std::move(flow_configuration), 
content_repo,
         std::move(metrics_publisher_store), filesystem, request_restart, 
asset_manager.get());
diff --git a/utils/include/core/Processor.h b/utils/include/core/Processor.h
index df19776a8..11c779b00 100644
--- a/utils/include/core/Processor.h
+++ b/utils/include/core/Processor.h
@@ -173,6 +173,22 @@ class ProcessorImpl : public virtual Processor, public 
ConnectableImpl, public C
     process_group_uuid_ = uuid;
   }
 
+  std::string getProcessGroupName() const override {
+    return process_group_name_;
+  }
+
+  void setProcessGroupName(const std::string &name) override {
+    process_group_name_ = name;
+  }
+
+  std::string getProcessGroupPath() const override {
+    return process_group_path_;
+  }
+
+  void setProcessGroupPath(const std::string &path) override {
+    process_group_path_ = path;
+  }
+
   void yield() override;
 
   void yield(std::chrono::steady_clock::duration delta_time) override;
@@ -226,6 +242,16 @@ class ProcessorImpl : public virtual Processor, public 
ConnectableImpl, public C
     return metrics_;
   }
 
+  logging::LOG_LEVEL getLogBulletinLevel() const override {
+    return log_bulletin_level_;
+  }
+
+  void setLogBulletinLevel(logging::LOG_LEVEL level) override {
+    log_bulletin_level_ = level;
+  }
+
+  void setLoggerCallback(const std::function<void(logging::LOG_LEVEL level, 
const std::string& message)>& callback) override;
+
   static constexpr auto DynamicProperties = std::array<DynamicProperty, 0>{};
 
   static constexpr auto OutputAttributes = 
std::array<OutputAttributeReference, 0>{};
@@ -247,6 +273,7 @@ class ProcessorImpl : public virtual Processor, public 
ConnectableImpl, public C
   gsl::not_null<std::shared_ptr<ProcessorMetrics>> metrics_;
 
   std::shared_ptr<logging::Logger> logger_;
+  logging::LOG_LEVEL log_bulletin_level_ = logging::LOG_LEVEL::warn;
 
  private:
   mutable std::mutex mutex_;
@@ -270,6 +297,8 @@ class ProcessorImpl : public virtual Processor, public 
ConnectableImpl, public C
   std::unordered_map<Connection*, std::unordered_set<Processor*>> 
reachable_processors_;
 
   std::string process_group_uuid_;
+  std::string process_group_name_;
+  std::string process_group_path_;
 };
 
 }  // namespace core
diff --git a/utils/include/utils/TimeUtil.h b/utils/include/utils/TimeUtil.h
index c9b1d4534..b8766f42b 100644
--- a/utils/include/utils/TimeUtil.h
+++ b/utils/include/utils/TimeUtil.h
@@ -104,6 +104,10 @@ inline std::string 
getRFC2616Format(std::chrono::sys_seconds tp) {
   return date::format("%a, %d %b %Y %H:%M:%S %Z", tp);
 }
 
+inline std::string getNiFiDateTimeFormat(std::chrono::sys_seconds tp) {
+  return date::format("%a %b %d %H:%M:%S %Z %Y", tp);
+}
+
 inline date::sys_seconds to_sys_time(const std::tm& t) {
   using date::year;
   using date::month;
diff --git a/utils/src/core/Processor.cpp b/utils/src/core/Processor.cpp
index cf2eb6b53..95feac9f8 100644
--- a/utils/src/core/Processor.cpp
+++ b/utils/src/core/Processor.cpp
@@ -37,7 +37,6 @@
 #include "range/v3/algorithm/any_of.hpp"
 #include "fmt/format.h"
 #include "Exception.h"
-#include "core/Processor.h"
 #include "core/ProcessorMetrics.h"
 
 using namespace std::literals::chrono_literals;
@@ -370,4 +369,8 @@ std::chrono::steady_clock::duration 
ProcessorImpl::getYieldTime() const {
   return std::max(yield_expiration_.load()-std::chrono::steady_clock::now(), 
std::chrono::steady_clock::duration{0});
 }
 
+void ProcessorImpl::setLoggerCallback(const 
std::function<void(logging::LOG_LEVEL level, const std::string& message)>& 
callback) {
+  logger_->setLogCallback(callback);
+}
+
 }  // namespace org::apache::nifi::minifi::core

Reply via email to