This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 922431faf MINIFICPP-1822 Add c2 alert capability
922431faf is described below

commit 922431fafe5e44d92072c99ca0ce70539bb71efc
Author: Adam Debreceni <[email protected]>
AuthorDate: Fri Aug 26 10:49:28 2022 +0200

    MINIFICPP-1822 Add c2 alert capability
    
    Closes #1367
    Signed-off-by: Marton Szasz <[email protected]>
---
 conf/minifi-log.properties                         |  11 +
 extensions/http-curl/tests/AlertTests.cpp          | 148 ++++++++++++
 extensions/http-curl/tests/CMakeLists.txt          |   1 +
 extensions/http-curl/tests/ServerAwareHandler.h    |   2 +
 libminifi/CMakeLists.txt                           |   2 +-
 libminifi/include/FlowControlProtocol.h            |   2 +-
 libminifi/include/FlowFileRecord.h                 |   2 +-
 libminifi/include/RemoteProcessorGroupPort.h       |   2 +-
 libminifi/include/SchedulingAgent.h                |   2 +-
 libminifi/include/ThreadedSchedulingAgent.h        |   2 +-
 libminifi/include/c2/PayloadSerializer.h           |   1 +
 .../controllers/LinuxPowerManagementService.h      |   2 +-
 .../controllers/NetworkPrioritizerService.h        |   2 +-
 libminifi/include/controllers/SSLContextService.h  |   2 +-
 .../include/controllers/ThreadManagementService.h  |   2 +-
 .../controllers/UpdatePolicyControllerService.h    |   2 +-
 .../AbstractAutoPersistingKeyValueStoreService.h   |   2 +-
 libminifi/include/core/FlowConfiguration.h         |   2 +-
 libminifi/include/core/Funnel.h                    |   2 +-
 libminifi/include/core/ProcessContext.h            |   2 +-
 libminifi/include/core/ProcessContextBuilder.h     |   2 +-
 libminifi/include/core/ProcessSession.h            |   2 +-
 .../include/core/ProcessSessionReadCallback.h      |   2 +-
 libminifi/include/core/Repository.h                |   2 +-
 .../controller/StandardControllerServiceNode.h     |   2 +-
 .../controller/StandardControllerServiceProvider.h |   2 +-
 .../include/core/logging/LoggerConfiguration.h     |  31 +--
 .../include/core/logging/LoggerFactory.h           |  46 ++--
 .../include/core/logging/Utils.h                   |  38 +--
 libminifi/include/core/logging/alert/AlertSink.h   | 120 +++++++++
 .../reporting/SiteToSiteProvenanceReportingTask.h  |   2 +-
 .../include/core/repository/FileSystemRepository.h |   2 +-
 .../core/repository/VolatileContentRepository.h    |   2 +-
 libminifi/include/core/state/UpdateController.h    |   1 +
 libminifi/include/core/yaml/CheckRequiredField.h   |   2 +-
 libminifi/include/core/yaml/YamlConfiguration.h    |   2 +-
 libminifi/include/core/yaml/YamlConnectionParser.h |   2 +-
 libminifi/include/io/AtomicEntryStream.h           |   2 +-
 libminifi/include/io/DescriptorStream.h            |   2 +-
 libminifi/include/io/FileStream.h                  |   2 +-
 libminifi/include/io/tls/SecureDescriptorStream.h  |   2 +-
 libminifi/include/properties/Configuration.h       |  10 +
 libminifi/include/properties/Properties.h          |   2 +-
 libminifi/include/provenance/Provenance.h          |   2 +-
 libminifi/include/sitetosite/Peer.h                |   2 +-
 libminifi/include/sitetosite/RawSocketProtocol.h   |   2 +-
 libminifi/include/utils/ByteArrayCallback.h        |   2 +-
 libminifi/include/utils/FlowFileQueue.h            |   2 +-
 libminifi/include/utils/Hash.h                     |  30 +++
 libminifi/include/utils/Id.h                       |   7 +-
 libminifi/include/utils/ListingStateManager.h      |   2 +-
 libminifi/include/utils/TestUtils.h                |  43 +++-
 libminifi/include/utils/TimeUtil.h                 |  10 +
 libminifi/include/utils/file/FileSystem.h          |   2 +-
 libminifi/include/utils/file/FileUtils.h           |   2 +-
 libminifi/src/Configuration.cpp                    |   8 +
 libminifi/src/FlowController.cpp                   |   1 +
 libminifi/src/core/logging/LoggerConfiguration.cpp |  66 +++--
 libminifi/src/core/logging/LoggerFactory.cpp       |  28 +++
 libminifi/src/core/logging/Utils.cpp               |  42 ++++
 libminifi/src/core/logging/alert/AlertSink.cpp     | 268 +++++++++++++++++++++
 libminifi/src/properties/Properties.cpp            |  16 +-
 libminifi/src/utils/Id.cpp                         |   2 +-
 libminifi/src/utils/TimeUtil.cpp                   |  36 +++
 libminifi/test/unit/SwapTestController.h           |  15 +-
 65 files changed, 877 insertions(+), 184 deletions(-)

diff --git a/conf/minifi-log.properties b/conf/minifi-log.properties
index aeca6dd4b..db296157b 100644
--- a/conf/minifi-log.properties
+++ b/conf/minifi-log.properties
@@ -38,6 +38,17 @@ appender.rolling.max_file_size=5242880
 ## The syslog appender will log using syslog(3) on *nix, and to the Windows 
Event Log on Windows
 #appender.syslog=syslog
 
+# Alert appender to forward critical logs through HTTP
+#appender.alert1=alert
+#appender.alert1.url=<URL>
+#appender.alert1.filter=<regex pattern to match logs against>
+#appender.alert1.rate.limit=10 min
+#appender.alert1.flush.period=5 s
+#appender.alert1.batch.size=100 KB
+#appender.alert1.buffer.limit=1 MB
+#appender.alert1.level=TRACE
+#appender.alert1.ssl.context.service=<Name of the SSLContextService>
+
 logger.root=INFO,rolling
 
 #Logging configurable by namespace
diff --git a/extensions/http-curl/tests/AlertTests.cpp 
b/extensions/http-curl/tests/AlertTests.cpp
new file mode 100644
index 000000000..2e6df680a
--- /dev/null
+++ b/extensions/http-curl/tests/AlertTests.cpp
@@ -0,0 +1,148 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#define CATCH_CONFIG_MAIN
+#include "TestBase.h"
+#include "Catch.h"
+#include "ServerAwareHandler.h"
+#include "CivetServer.h"
+#include "TestServer.h"
+#include "HTTPIntegrationBase.h"
+#include "rapidjson/document.h"
+#include "EmptyFlow.h"
+#include "Utils.h"
+#include "TestUtils.h"
+
+class AlertHandler : public ServerAwareHandler {
+ public:
+  explicit AlertHandler(std::string agent_id): agent_id_(std::move(agent_id)) 
{}
+
+  bool handlePut(CivetServer* , struct mg_connection *conn) override {
+    auto msg = readPayload(conn);
+    rapidjson::Document doc;
+    rapidjson::ParseResult res = doc.Parse(msg.c_str());
+    REQUIRE(static_cast<bool>(res));
+    REQUIRE(doc.IsObject());
+    REQUIRE(doc.HasMember("agentId"));
+    REQUIRE(doc["agentId"].IsString());
+    REQUIRE(doc.HasMember("alerts"));
+    REQUIRE(doc["alerts"].IsArray());
+    REQUIRE(doc["alerts"].Size() > 0);
+    std::string id(doc["agentId"].GetString(), 
doc["agentId"].GetStringLength());
+    REQUIRE(id == agent_id_);
+    std::vector<std::string> batch;
+    for (size_t i = 0; i < doc["alerts"].Size(); ++i) {
+      REQUIRE(doc["alerts"][i].IsString());
+      batch.emplace_back(doc["alerts"][i].GetString(), 
doc["alerts"][i].GetStringLength());
+    }
+    alerts_.enqueue(std::move(batch));
+    return true;
+  }
+
+  std::string agent_id_;
+  utils::ConditionConcurrentQueue<std::vector<std::string>> alerts_;
+};
+
+class VerifyAlerts : public HTTPIntegrationBase {
+ public:
+  void testSetup() override {}
+
+  void runAssertions() override {
+    verify_();
+  }
+
+  std::function<bool()> verify_;
+};
+
+TEST_CASE("Alert system forwards logs") {
+  auto clock = std::make_shared<utils::ManualClock>();
+  utils::timeutils::setClock(clock);
+
+  TempDirectory dir;
+  auto flow_config_file = std::filesystem::path(dir.getPath()) / "config.yml";
+  std::ofstream(flow_config_file) << empty_flow;
+
+  std::string agent_id = "test-agent-1";
+  VerifyAlerts harness;
+  AlertHandler handler(agent_id);
+  harness.setUrl("http://localhost:0/api/alerts";, &handler);
+  
harness.getConfiguration()->set(minifi::Configuration::nifi_c2_agent_identifier,
 agent_id);
+  harness.getConfiguration()->setHome(dir.getPath());
+
+  auto log_props = std::make_shared<logging::LoggerProperties>();
+  log_props->set("appender.alert1", "alert");
+  log_props->set("appender.alert1.url", harness.getC2RestUrl());
+  log_props->set("appender.alert1.filter", ".*<begin>(.*)<end>.*");
+  log_props->set("appender.alert1.rate.limit", "10 s");
+  log_props->set("appender.alert1.flush.period", "1 s");
+  log_props->set("logger.root", "INFO,alert1");
+  logging::LoggerConfiguration::getConfiguration().initialize(log_props);
+
+  auto verifyLogsArrived = [&] (const std::vector<std::string>& expected) {
+    std::vector<std::string> logs;
+    REQUIRE(handler.alerts_.dequeueWaitFor(logs, 1s));
+    REQUIRE(logs.size() == expected.size());
+    for (size_t idx = 0; idx < expected.size(); ++idx) {
+      bool contains = std::search(logs[idx].begin(), logs[idx].end(), 
expected[idx].begin(), expected[idx].end()) != logs[idx].end();
+      REQUIRE(contains);
+    }
+  };
+
+  harness.verify_ = [&] {
+    auto logger = logging::LoggerFactory<minifi::FlowController>::getLogger();
+    // time = 0
+    logger->log_error("not matched");
+    logger->log_error("<begin>one<end>");
+    logger->log_error("not the same but treated so <begin>one<end>");
+    logger->log_error("<begin>two<end>");
+    clock->advance(2s);
+    // time = 2
+    verifyLogsArrived({
+        "<begin>one<end>", "<begin>two<end>"
+    });
+
+    clock->advance(5s);
+    // time = 7
+    // no new logs over HTTP
+
+    logger->log_error("other <begin>one<end>");
+    logger->log_error("new log <begin>three<end>");
+    clock->advance(2s);
+
+    // time = 9
+    verifyLogsArrived({
+        "new log <begin>three<end>"
+    });
+
+    clock->advance(2s);
+    // time = 11
+    logger->log_error("other <begin>one<end>");
+    logger->log_error("new log <begin>three<end>");
+    clock->advance(2s);
+    // time = 13
+
+    verifyLogsArrived({
+        "other <begin>one<end>"
+    });
+
+    return true;
+  };
+
+  harness.run(flow_config_file.string(), dir.getPath());
+}
diff --git a/extensions/http-curl/tests/CMakeLists.txt 
b/extensions/http-curl/tests/CMakeLists.txt
index 00827fa5d..108d1e85e 100644
--- a/extensions/http-curl/tests/CMakeLists.txt
+++ b/extensions/http-curl/tests/CMakeLists.txt
@@ -110,3 +110,4 @@ add_test(NAME C2MetricsTest COMMAND C2MetricsTest 
"${TEST_RESOURCES}/TestC2Metri
 add_test(NAME C2EmptyMetricTest COMMAND C2EmptyMetricTest 
"${TEST_RESOURCES}/TestEmpty.yml")
 add_test(NAME C2SameProcessorMetrics COMMAND C2SameProcessorMetrics 
"${TEST_RESOURCES}/TestSameProcessorMetrics.yml")
 add_test(NAME C2DescribeMetricsTest COMMAND C2DescribeMetricsTest 
"${TEST_RESOURCES}/TestSameProcessorMetrics.yml")
+add_test(NAME AlertTests COMMAND AlertTests)
diff --git a/extensions/http-curl/tests/ServerAwareHandler.h 
b/extensions/http-curl/tests/ServerAwareHandler.h
index 14b263bfc..3ca23d261 100644
--- a/extensions/http-curl/tests/ServerAwareHandler.h
+++ b/extensions/http-curl/tests/ServerAwareHandler.h
@@ -21,6 +21,8 @@
 #include <string>
 #include <array>
 
+#include "CivetServer.h"
+
 class ServerAwareHandler: public CivetHandler {
  protected:
   void sleep_for(std::chrono::milliseconds time) {
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 8ec537140..9bcfd218d 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -57,7 +57,7 @@ if (NOT OPENSSL_OFF)
     set(TLS_SOURCES "src/utils/tls/*.cpp" "src/io/tls/*.cpp")
 endif()
 
-file(GLOB SOURCES "src/agent/agent_docs.cpp" "src/agent/build_description.cpp" 
"src/properties/*.cpp" "src/utils/file/*.cpp" "src/sitetosite/*.cpp"  
"src/core/logging/*.cpp" "src/core/logging/internal/*.cpp" 
"src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" 
"src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} 
${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" 
"src/controllers/keyvalue/*.cpp" "src/core/*.cpp"  "src/core/reposit [...]
+file(GLOB SOURCES "src/agent/agent_docs.cpp" "src/agent/build_description.cpp" 
"src/properties/*.cpp" "src/utils/file/*.cpp" "src/sitetosite/*.cpp"  
"src/core/logging/*.cpp" "src/core/logging/internal/*.cpp" 
"src/core/logging/alert/*.cpp" "src/core/state/*.cpp" 
"src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" 
"src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} 
"src/core/controller/*.cpp" "src/controllers/*.cpp" 
"src/controllers/keyvalue/*.cpp" "src [...]
 # manually add this as it might not yet be present when this executes
 list(APPEND SOURCES "${CMAKE_CURRENT_BINARY_DIR}/agent_version.cpp")
 
diff --git a/libminifi/include/FlowControlProtocol.h 
b/libminifi/include/FlowControlProtocol.h
index f0b762c68..b85f16a1e 100644
--- a/libminifi/include/FlowControlProtocol.h
+++ b/libminifi/include/FlowControlProtocol.h
@@ -32,7 +32,7 @@
 #include <thread>
 #include <vector>
 
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 #include "core/Property.h"
 #include "properties/Configure.h"
 #include "utils/file/FileUtils.h"
diff --git a/libminifi/include/FlowFileRecord.h 
b/libminifi/include/FlowFileRecord.h
index 5ddd4c520..210fa935e 100644
--- a/libminifi/include/FlowFileRecord.h
+++ b/libminifi/include/FlowFileRecord.h
@@ -34,7 +34,7 @@
 #include "io/BaseStream.h"
 #include "core/FlowFile.h"
 #include "utils/TimeUtil.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 #include "ResourceClaim.h"
 #include "Connection.h"
 #include "io/OutputStream.h"
diff --git a/libminifi/include/RemoteProcessorGroupPort.h 
b/libminifi/include/RemoteProcessorGroupPort.h
index c78413ccb..c717308c9 100644
--- a/libminifi/include/RemoteProcessorGroupPort.h
+++ b/libminifi/include/RemoteProcessorGroupPort.h
@@ -33,7 +33,7 @@
 #include "sitetosite/SiteToSiteClient.h"
 #include "io/StreamFactory.h"
 #include "controllers/SSLContextService.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 #include "utils/Export.h"
 
 namespace org::apache::nifi::minifi {
diff --git a/libminifi/include/SchedulingAgent.h 
b/libminifi/include/SchedulingAgent.h
index d3f1d6f20..f02f4773a 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -35,7 +35,7 @@
 #include "utils/ThreadPool.h"
 #include "utils/BackTrace.h"
 #include "core/Core.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 #include "properties/Configure.h"
 #include "FlowFileRecord.h"
 #include "core/logging/Logger.h"
diff --git a/libminifi/include/ThreadedSchedulingAgent.h 
b/libminifi/include/ThreadedSchedulingAgent.h
index 8d488040d..c53249096 100644
--- a/libminifi/include/ThreadedSchedulingAgent.h
+++ b/libminifi/include/ThreadedSchedulingAgent.h
@@ -25,7 +25,7 @@
 #include <string>
 #include <chrono>
 #include "properties/Configure.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 #include "core/Processor.h"
 #include "core/Repository.h"
 #include "core/ProcessContext.h"
diff --git a/libminifi/include/c2/PayloadSerializer.h 
b/libminifi/include/c2/PayloadSerializer.h
index d2fac7bf1..effd297e1 100644
--- a/libminifi/include/c2/PayloadSerializer.h
+++ b/libminifi/include/c2/PayloadSerializer.h
@@ -27,6 +27,7 @@
 #include "core/state/Value.h"
 #include "c2/C2Protocol.h"
 #include "io/BaseStream.h"
+#include "io/BufferStream.h"
 #include "utils/gsl.h"
 
 namespace org {
diff --git a/libminifi/include/controllers/LinuxPowerManagementService.h 
b/libminifi/include/controllers/LinuxPowerManagementService.h
index ed433b23a..343ddec07 100644
--- a/libminifi/include/controllers/LinuxPowerManagementService.h
+++ b/libminifi/include/controllers/LinuxPowerManagementService.h
@@ -25,7 +25,7 @@
 #include "utils/StringUtils.h"
 #include "io/validation.h"
 #include "core/controller/ControllerService.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 #include "ThreadManagementService.h"
 
 namespace org::apache::nifi::minifi::controllers {
diff --git a/libminifi/include/controllers/NetworkPrioritizerService.h 
b/libminifi/include/controllers/NetworkPrioritizerService.h
index a7f049426..5a70f92e7 100644
--- a/libminifi/include/controllers/NetworkPrioritizerService.h
+++ b/libminifi/include/controllers/NetworkPrioritizerService.h
@@ -27,7 +27,7 @@
 #include "io/validation.h"
 #include "controllers/SSLContextService.h"
 #include "core/controller/ControllerService.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 #include "ThreadManagementService.h"
 #include "io/NetworkPrioritizer.h"
 #include "utils/Export.h"
diff --git a/libminifi/include/controllers/SSLContextService.h 
b/libminifi/include/controllers/SSLContextService.h
index ae88013c3..aa165ab12 100644
--- a/libminifi/include/controllers/SSLContextService.h
+++ b/libminifi/include/controllers/SSLContextService.h
@@ -38,7 +38,7 @@
 #include "utils/tls/ExtendedKeyUsage.h"
 #include "io/validation.h"
 #include "../core/controller/ControllerService.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 #include "utils/Export.h"
 #include "utils/tls/CertificateUtils.h"
 
diff --git a/libminifi/include/controllers/ThreadManagementService.h 
b/libminifi/include/controllers/ThreadManagementService.h
index 7af8e744c..b61d77234 100644
--- a/libminifi/include/controllers/ThreadManagementService.h
+++ b/libminifi/include/controllers/ThreadManagementService.h
@@ -24,7 +24,7 @@
 #include "utils/StringUtils.h"
 #include "io/validation.h"
 #include "core/controller/ControllerService.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 
 namespace org {
 namespace apache {
diff --git a/libminifi/include/controllers/UpdatePolicyControllerService.h 
b/libminifi/include/controllers/UpdatePolicyControllerService.h
index 15bd87972..2cc68b1f3 100644
--- a/libminifi/include/controllers/UpdatePolicyControllerService.h
+++ b/libminifi/include/controllers/UpdatePolicyControllerService.h
@@ -23,7 +23,7 @@
 #include "utils/StringUtils.h"
 #include "io/validation.h"
 #include "core/controller/ControllerService.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 #include "core/state/UpdatePolicy.h"
 #include "utils/Export.h"
 
diff --git 
a/libminifi/include/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h
 
b/libminifi/include/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h
index 3faaf3cc5..46e564df6 100644
--- 
a/libminifi/include/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h
+++ 
b/libminifi/include/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h
@@ -26,7 +26,7 @@
 #include "core/Core.h"
 #include "properties/Configure.h"
 #include "core/logging/Logger.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 #include "utils/Export.h"
 
 namespace org::apache::nifi::minifi::controllers {
diff --git a/libminifi/include/core/FlowConfiguration.h 
b/libminifi/include/core/FlowConfiguration.h
index e8239034d..1c2be2e79 100644
--- a/libminifi/include/core/FlowConfiguration.h
+++ b/libminifi/include/core/FlowConfiguration.h
@@ -33,7 +33,7 @@
 #include "core/reporting/SiteToSiteProvenanceReportingTask.h"
 
 #include "core/Processor.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessGroup.h"
diff --git a/libminifi/include/core/Funnel.h b/libminifi/include/core/Funnel.h
index baae6e8a8..e7783d49d 100644
--- a/libminifi/include/core/Funnel.h
+++ b/libminifi/include/core/Funnel.h
@@ -20,7 +20,7 @@
 #include <string>
 #include <memory>
 
-#include "logging/LoggerConfiguration.h"
+#include "logging/LoggerFactory.h"
 #include "Processor.h"
 
 namespace org::apache::nifi::minifi::core {
diff --git a/libminifi/include/core/ProcessContext.h 
b/libminifi/include/core/ProcessContext.h
index fc7fb25d3..2f4c23e5c 100644
--- a/libminifi/include/core/ProcessContext.h
+++ b/libminifi/include/core/ProcessContext.h
@@ -33,7 +33,7 @@
 #include "core/repository/FileSystemRepository.h"
 #include "core/controller/ControllerServiceProvider.h"
 #include "core/controller/ControllerServiceLookup.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 #include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h"
 #include "ProcessorNode.h"
 #include "core/Repository.h"
diff --git a/libminifi/include/core/ProcessContextBuilder.h 
b/libminifi/include/core/ProcessContextBuilder.h
index 7ac0f616d..80260860b 100644
--- a/libminifi/include/core/ProcessContextBuilder.h
+++ b/libminifi/include/core/ProcessContextBuilder.h
@@ -32,7 +32,7 @@
 #include "core/repository/FileSystemRepository.h"
 #include "core/controller/ControllerServiceProvider.h"
 #include "core/controller/ControllerServiceLookup.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 #include "ProcessContext.h"
 #include "ProcessorNode.h"
 #include "core/Repository.h"
diff --git a/libminifi/include/core/ProcessSession.h 
b/libminifi/include/core/ProcessSession.h
index 9e0a48263..29054e008 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -31,7 +31,7 @@
 #include "ProcessContext.h"
 #include "FlowFileRecord.h"
 #include "Exception.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 #include "core/Deprecated.h"
 #include "FlowFile.h"
 #include "WeakReference.h"
diff --git a/libminifi/include/core/ProcessSessionReadCallback.h 
b/libminifi/include/core/ProcessSessionReadCallback.h
index 19a57c8c5..a89003b90 100644
--- a/libminifi/include/core/ProcessSessionReadCallback.h
+++ b/libminifi/include/core/ProcessSessionReadCallback.h
@@ -23,7 +23,7 @@
 #include <memory>
 #include <string>
 
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 #include "io/BaseStream.h"
 #include "FlowFileRecord.h"
 
diff --git a/libminifi/include/core/Repository.h 
b/libminifi/include/core/Repository.h
index afe3e5750..5e265d091 100644
--- a/libminifi/include/core/Repository.h
+++ b/libminifi/include/core/Repository.h
@@ -34,7 +34,7 @@
 #include "core/ContentRepository.h"
 #include "core/SerializableComponent.h"
 #include "properties/Configure.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 #include "core/Property.h"
 #include "ResourceClaim.h"
 #include "utils/TimeUtil.h"
diff --git a/libminifi/include/core/controller/StandardControllerServiceNode.h 
b/libminifi/include/core/controller/StandardControllerServiceNode.h
index 90c693a65..df65ef91a 100644
--- a/libminifi/include/core/controller/StandardControllerServiceNode.h
+++ b/libminifi/include/core/controller/StandardControllerServiceNode.h
@@ -22,7 +22,7 @@
 
 #include "core/Core.h"
 #include "ControllerServiceNode.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 #include "core/ProcessGroup.h"
 
 namespace org::apache::nifi::minifi::core::controller {
diff --git 
a/libminifi/include/core/controller/StandardControllerServiceProvider.h 
b/libminifi/include/core/controller/StandardControllerServiceProvider.h
index c9402510b..92b12890c 100644
--- a/libminifi/include/core/controller/StandardControllerServiceProvider.h
+++ b/libminifi/include/core/controller/StandardControllerServiceProvider.h
@@ -31,7 +31,7 @@
 #include "ControllerServiceNode.h"
 #include "StandardControllerServiceNode.h"
 #include "ControllerServiceProvider.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 
 namespace org {
 namespace apache {
diff --git a/libminifi/include/core/logging/LoggerConfiguration.h 
b/libminifi/include/core/logging/LoggerConfiguration.h
index 7b314c026..b9c9197a5 100644
--- a/libminifi/include/core/logging/LoggerConfiguration.h
+++ b/libminifi/include/core/logging/LoggerConfiguration.h
@@ -25,6 +25,7 @@
 #include <map>
 #include <mutex>
 #include <string>
+#include <unordered_set>
 
 #include "spdlog/common.h"
 #include "spdlog/sinks/rotating_file_sink.h"
@@ -37,6 +38,8 @@
 #include "core/logging/Logger.h"
 #include "LoggerProperties.h"
 #include "internal/CompressionManager.h"
+#include "core/logging/LoggerFactory.h"
+#include "alert/AlertSink.h"
 
 class LoggerTestAccessor;
 
@@ -53,6 +56,8 @@ struct LoggerNamespace {
   std::vector<std::shared_ptr<spdlog::sinks::sink>> exported_sinks;
   std::map<std::string, std::shared_ptr<LoggerNamespace>> children;
 
+  void forEachSink(const std::function<void(const 
std::shared_ptr<spdlog::sinks::sink>&)>& op) const;
+
   LoggerNamespace()
       : level(spdlog::level::off),
         has_level(false),
@@ -96,6 +101,8 @@ class LoggerConfiguration {
     return getCompressedLog(std::chrono::milliseconds{0}, flush);
   }
 
+  void initializeAlertSinks(core::controller::ControllerServiceProvider* 
controller, const std::shared_ptr<AgentIdentificationProvider>& agent_id);
+
   template<class Rep, class Period>
   static std::unique_ptr<io::InputStream> getCompressedLog(const 
std::chrono::duration<Rep, Period>& time, bool flush = false) {
     return getConfiguration().compression_manager_.getCompressedLog(time, 
flush);
@@ -109,9 +116,9 @@ class LoggerConfiguration {
   static const char *spdlog_default_pattern;
 
  protected:
-  static std::shared_ptr<internal::LoggerNamespace> 
initialize_namespaces(const std::shared_ptr<LoggerProperties> 
&logger_properties);
-  static std::shared_ptr<spdlog::logger> get_logger(std::shared_ptr<Logger> 
logger, const std::shared_ptr<internal::LoggerNamespace> &root_namespace, const 
std::string &name,
-                                                    
std::shared_ptr<spdlog::formatter> formatter, bool remove_if_present = false);
+  static std::shared_ptr<internal::LoggerNamespace> 
initialize_namespaces(const std::shared_ptr<LoggerProperties> 
&logger_properties, const std::shared_ptr<Logger> &logger = {});
+  static std::shared_ptr<spdlog::logger> get_logger(const 
std::shared_ptr<Logger> &logger, const 
std::shared_ptr<internal::LoggerNamespace> &root_namespace, const std::string 
&name,
+                                                    const 
std::shared_ptr<spdlog::formatter> &formatter, bool remove_if_present = false);
 
  private:
   std::shared_ptr<Logger> getLogger(const std::string& name, const 
std::lock_guard<std::mutex>& lock);
@@ -149,24 +156,8 @@ class LoggerConfiguration {
   std::mutex mutex;
   std::shared_ptr<LoggerImpl> logger_ = nullptr;
   std::shared_ptr<LoggerControl> controller_;
+  std::unordered_set<std::shared_ptr<AlertSink>> alert_sinks_;
   bool shorten_names_;
 };
 
-template<typename T>
-class LoggerFactory {
- public:
-  /**
-   * Gets an initialized logger for the template class.
-   */
-  static std::shared_ptr<Logger> getLogger() {
-    static std::shared_ptr<Logger> logger = 
LoggerConfiguration::getConfiguration().getLogger(core::getClassName<T>());
-    return logger;
-  }
-
-  static std::shared_ptr<Logger> getAliasedLogger(const std::string &alias) {
-    std::shared_ptr<Logger> logger = 
LoggerConfiguration::getConfiguration().getLogger(alias);
-    return logger;
-  }
-};
-
 }  // namespace org::apache::nifi::minifi::core::logging
diff --git a/extensions/http-curl/tests/ServerAwareHandler.h 
b/libminifi/include/core/logging/LoggerFactory.h
similarity index 51%
copy from extensions/http-curl/tests/ServerAwareHandler.h
copy to libminifi/include/core/logging/LoggerFactory.h
index 14b263bfc..104d9437a 100644
--- a/extensions/http-curl/tests/ServerAwareHandler.h
+++ b/libminifi/include/core/logging/LoggerFactory.h
@@ -19,38 +19,28 @@
 #pragma once
 
 #include <string>
-#include <array>
+#include <memory>
 
-class ServerAwareHandler: public CivetHandler {
- protected:
-  void sleep_for(std::chrono::milliseconds time) {
-    std::unique_lock<std::mutex> lock(mutex_);
-    stop_signal_.wait_for(lock, time, [&] {return terminate_.load();});
-  }
-
-  bool isServerRunning() const {
-    return !terminate_.load();
-  }
+#include "core/logging/Logger.h"
+#include "core/Core.h"
 
-  virtual std::string readPayload(struct mg_connection* conn) {
-    std::string response;
-    int readBytes;
+namespace org::apache::nifi::minifi::core::logging {
 
-    std::array<char, 1024> buffer;
-    while ((readBytes = mg_read(conn, buffer.data(), buffer.size())) > 0) {
-      response.append(buffer.data(), readBytes);
-    }
-    return response;
-  }
+class LoggerFactoryBase {
+ public:
+  static std::shared_ptr<Logger> getAliasedLogger(const std::string &alias);
+};
 
+template<typename T>
+class LoggerFactory : public LoggerFactoryBase {
  public:
-  void stop() {
-    terminate_ = true;
-    stop_signal_.notify_all();
+  /**
+   * Gets an initialized logger for the template class.
+   */
+  static std::shared_ptr<Logger> getLogger() {
+    static std::shared_ptr<Logger> logger = 
getAliasedLogger(core::getClassName<T>());
+    return logger;
   }
-
- private:
-  std::mutex mutex_;
-  std::condition_variable stop_signal_;
-  std::atomic_bool terminate_{false};
 };
+
+}  // namespace org::apache::nifi::minifi::core::logging
diff --git a/extensions/http-curl/tests/ServerAwareHandler.h 
b/libminifi/include/core/logging/Utils.h
similarity index 50%
copy from extensions/http-curl/tests/ServerAwareHandler.h
copy to libminifi/include/core/logging/Utils.h
index 14b263bfc..5f4acf2d6 100644
--- a/extensions/http-curl/tests/ServerAwareHandler.h
+++ b/libminifi/include/core/logging/Utils.h
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -18,39 +17,12 @@
 
 #pragma once
 
+#include <optional>
 #include <string>
-#include <array>
-
-class ServerAwareHandler: public CivetHandler {
- protected:
-  void sleep_for(std::chrono::milliseconds time) {
-    std::unique_lock<std::mutex> lock(mutex_);
-    stop_signal_.wait_for(lock, time, [&] {return terminate_.load();});
-  }
-
-  bool isServerRunning() const {
-    return !terminate_.load();
-  }
-
-  virtual std::string readPayload(struct mg_connection* conn) {
-    std::string response;
-    int readBytes;
+#include "spdlog/common.h"
 
-    std::array<char, 1024> buffer;
-    while ((readBytes = mg_read(conn, buffer.data(), buffer.size())) > 0) {
-      response.append(buffer.data(), readBytes);
-    }
-    return response;
-  }
+namespace org::apache::nifi::minifi::utils {
 
- public:
-  void stop() {
-    terminate_ = true;
-    stop_signal_.notify_all();
-  }
+std::optional<spdlog::level::level_enum> parse_log_level(const std::string& 
level_name);
 
- private:
-  std::mutex mutex_;
-  std::condition_variable stop_signal_;
-  std::atomic_bool terminate_{false};
-};
+}  // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/include/core/logging/alert/AlertSink.h 
b/libminifi/include/core/logging/alert/AlertSink.h
new file mode 100644
index 000000000..6286a686c
--- /dev/null
+++ b/libminifi/include/core/logging/alert/AlertSink.h
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <deque>
+#include <mutex>
+#include <unordered_set>
+#include <regex>
+#include <utility>
+#include <string>
+#include <memory>
+
+#include "core/controller/ControllerServiceProvider.h"
+#include "core/logging/LoggerProperties.h"
+#include "utils/ThreadPool.h"
+#include "utils/StagingQueue.h"
+#include "utils/RegexUtils.h"
+#include "properties/Configure.h"
+#include "spdlog/sinks/base_sink.h"
+
+namespace org::apache::nifi::minifi::controllers {
+class SSLContextService;
+}  // namespace org::apache::nifi::minifi::controllers
+
+namespace org::apache::nifi::minifi::core::logging {
+
+class AlertSink : public spdlog::sinks::base_sink<std::mutex> {
+ public:
+  AlertSink(const AlertSink&) = delete;
+  AlertSink(AlertSink&&) = delete;
+  AlertSink& operator=(const AlertSink&&) = delete;
+  AlertSink& operator=(AlertSink&&) = delete;
+
+  static std::shared_ptr<AlertSink> create(const std::string& 
prop_name_prefix, const std::shared_ptr<LoggerProperties>& logger_properties, 
std::shared_ptr<Logger> logger);
+
+  void initialize(core::controller::ControllerServiceProvider* controller, 
std::shared_ptr<AgentIdentificationProvider> agent_id);
+
+  ~AlertSink() override;
+
+ private:
+  struct Config {
+    std::string url;
+    std::optional<std::string> ssl_service_name;
+    int batch_size;
+    std::chrono::milliseconds flush_period;
+    std::chrono::milliseconds rate_limit;
+    int buffer_limit;
+    utils::Regex filter;
+    spdlog::level::level_enum level;
+  };
+
+  struct Services {
+    std::shared_ptr<controllers::SSLContextService> ssl_service;
+    std::shared_ptr<AgentIdentificationProvider> agent_id;
+  };
+
+  class LogBuffer {
+    friend class AlertSink;
+   public:
+    static LogBuffer allocate(size_t size);
+    LogBuffer commit();
+    [[nodiscard]]
+    size_t size() const;
+   private:
+    size_t size_{0};
+    std::deque<std::pair<std::string, size_t>> data_;
+  };
+
+  class LiveLogSet {
+    using Hash = size_t;
+    const std::chrono::milliseconds lifetime_{};
+    std::unordered_set<Hash> hashes_to_ignore_;
+    std::deque<std::pair<std::chrono::milliseconds, Hash>> timestamped_hashes_;
+   public:
+    explicit LiveLogSet(std::chrono::milliseconds lifetime): 
lifetime_(lifetime) {}
+
+    bool tryAdd(std::chrono::milliseconds now, Hash hash);
+  };
+
+  AlertSink(Config config, std::shared_ptr<Logger> logger);
+
+  void run();
+  void send(Services& services);
+
+  void sink_it_(const spdlog::details::log_msg& msg) override;
+  void flush_() override;
+
+  Config config_;
+  LiveLogSet live_logs_;
+
+  std::atomic_bool running_{true};
+  std::mutex mtx_;
+  std::chrono::milliseconds next_flush_;
+  std::condition_variable cv_;
+  std::thread flush_thread_;
+
+  utils::StagingQueue<LogBuffer> buffer_;
+
+  std::shared_ptr<utils::timeutils::Clock> clock_ = 
utils::timeutils::getClock();
+  std::atomic<gsl::owner<Services*>> services_{nullptr};
+
+  std::shared_ptr<Logger> logger_;
+};
+
+}  // namespace org::apache::nifi::minifi::core::logging
diff --git 
a/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h 
b/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h
index e9a1083da..f1c3e31bd 100644
--- a/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h
+++ b/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h
@@ -27,7 +27,7 @@
 #include "core/ProcessSession.h"
 #include "RemoteProcessorGroupPort.h"
 #include "io/StreamFactory.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 
 namespace org::apache::nifi::minifi::core::reporting {
 
diff --git a/libminifi/include/core/repository/FileSystemRepository.h 
b/libminifi/include/core/repository/FileSystemRepository.h
index c2b8c7505..01e231c2a 100644
--- a/libminifi/include/core/repository/FileSystemRepository.h
+++ b/libminifi/include/core/repository/FileSystemRepository.h
@@ -24,7 +24,7 @@
 #include "core/Core.h"
 #include "../ContentRepository.h"
 #include "properties/Configure.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 namespace org {
 namespace apache {
 namespace nifi {
diff --git a/libminifi/include/core/repository/VolatileContentRepository.h 
b/libminifi/include/core/repository/VolatileContentRepository.h
index 453b1d1ff..52122ebad 100644
--- a/libminifi/include/core/repository/VolatileContentRepository.h
+++ b/libminifi/include/core/repository/VolatileContentRepository.h
@@ -29,7 +29,7 @@
 #include "core/repository/VolatileRepository.h"
 #include "properties/Configure.h"
 #include "core/Connectable.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 #include "utils/GeneralUtils.h"
 
 namespace org {
diff --git a/libminifi/include/core/state/UpdateController.h 
b/libminifi/include/core/state/UpdateController.h
index 496b523a2..4fc93883f 100644
--- a/libminifi/include/core/state/UpdateController.h
+++ b/libminifi/include/core/state/UpdateController.h
@@ -25,6 +25,7 @@
 #include <map>
 #include "utils/ThreadPool.h"
 #include "utils/BackTrace.h"
+#include "io/InputStream.h"
 
 namespace org {
 namespace apache {
diff --git a/libminifi/include/core/yaml/CheckRequiredField.h 
b/libminifi/include/core/yaml/CheckRequiredField.h
index 0967a723b..2f0e4b0eb 100644
--- a/libminifi/include/core/yaml/CheckRequiredField.h
+++ b/libminifi/include/core/yaml/CheckRequiredField.h
@@ -21,7 +21,7 @@
 #include <memory>
 #include <vector>
 
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 #include "yaml-cpp/yaml.h"
 
 namespace org {
diff --git a/libminifi/include/core/yaml/YamlConfiguration.h 
b/libminifi/include/core/yaml/YamlConfiguration.h
index bd9e01e20..18ff1ee19 100644
--- a/libminifi/include/core/yaml/YamlConfiguration.h
+++ b/libminifi/include/core/yaml/YamlConfiguration.h
@@ -24,7 +24,7 @@
 #include <unordered_set>
 
 #include "core/FlowConfiguration.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 #include "core/ProcessorConfig.h"
 #include "Exception.h"
 #include "io/StreamFactory.h"
diff --git a/libminifi/include/core/yaml/YamlConnectionParser.h 
b/libminifi/include/core/yaml/YamlConnectionParser.h
index 6c906eabe..e1c544d77 100644
--- a/libminifi/include/core/yaml/YamlConnectionParser.h
+++ b/libminifi/include/core/yaml/YamlConnectionParser.h
@@ -22,7 +22,7 @@
 #include <string>
 
 #include "core/ProcessGroup.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 
 #include "yaml-cpp/yaml.h"
 #include "utils/gsl.h"
diff --git a/libminifi/include/io/AtomicEntryStream.h 
b/libminifi/include/io/AtomicEntryStream.h
index f4f23c0f1..dc3dc0581 100644
--- a/libminifi/include/io/AtomicEntryStream.h
+++ b/libminifi/include/io/AtomicEntryStream.h
@@ -27,7 +27,7 @@
 #include "BaseStream.h"
 #include "core/repository/AtomicRepoEntries.h"
 #include "Exception.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 #include "utils/gsl.h"
 
 namespace org {
diff --git a/libminifi/include/io/DescriptorStream.h 
b/libminifi/include/io/DescriptorStream.h
index 60eb81ca8..e0e9bb224 100644
--- a/libminifi/include/io/DescriptorStream.h
+++ b/libminifi/include/io/DescriptorStream.h
@@ -25,7 +25,7 @@
 #include <string>
 #include "EndianCheck.h"
 #include "BaseStream.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 
 namespace org {
 namespace apache {
diff --git a/libminifi/include/io/FileStream.h 
b/libminifi/include/io/FileStream.h
index da8f2263d..754d3ab59 100644
--- a/libminifi/include/io/FileStream.h
+++ b/libminifi/include/io/FileStream.h
@@ -23,7 +23,7 @@
 #include <fstream>
 #include <string>
 #include "BaseStream.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 
 namespace org {
 namespace apache {
diff --git a/libminifi/include/io/tls/SecureDescriptorStream.h 
b/libminifi/include/io/tls/SecureDescriptorStream.h
index fbef71de9..eb2dd8ddd 100644
--- a/libminifi/include/io/tls/SecureDescriptorStream.h
+++ b/libminifi/include/io/tls/SecureDescriptorStream.h
@@ -32,7 +32,7 @@
 #include <unistd.h>
 #endif
 
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 #include "io/BaseStream.h"
 #include "io/EndianCheck.h"
 
diff --git a/libminifi/include/properties/Configuration.h 
b/libminifi/include/properties/Configuration.h
index c4c6abd9b..906bdf0c2 100644
--- a/libminifi/include/properties/Configuration.h
+++ b/libminifi/include/properties/Configuration.h
@@ -154,6 +154,16 @@ class Configuration : public Properties {
   static constexpr const char *nifi_log_compression_cached_log_max_size = 
"nifi.log.compression.cached.log.max.size";
   static constexpr const char *nifi_log_compression_compressed_log_max_size = 
"nifi.log.compression.compressed.log.max.size";
 
+  // alert options
+  static constexpr const char *nifi_log_alert_url = "nifi.log.alert.url";
+  static constexpr const char *nifi_log_alert_ssl_context_service = 
"nifi.log.alert.ssl.context.service";
+  static constexpr const char *nifi_log_alert_batch_size = 
"nifi.log.alert.batch.size";
+  static constexpr const char *nifi_log_alert_flush_period = 
"nifi.log.alert.flush.period";
+  static constexpr const char *nifi_log_alert_filter = "nifi.log.alert.filter";
+  static constexpr const char *nifi_log_alert_rate_limit = 
"nifi.log.alert.rate.limit";
+  static constexpr const char *nifi_log_alert_buffer_limit = 
"nifi.log.alert.buffer.limit";
+  static constexpr const char *nifi_log_alert_level = "nifi.log.alert.level";
+
   static constexpr const char *nifi_asset_directory = "nifi.asset.directory";
 
   // Metrics publisher options
diff --git a/libminifi/include/properties/Properties.h 
b/libminifi/include/properties/Properties.h
index 2a8325d70..ca227d145 100644
--- a/libminifi/include/properties/Properties.h
+++ b/libminifi/include/properties/Properties.h
@@ -49,7 +49,7 @@ class Properties {
   };
 
  public:
-  explicit Properties(const std::string& name = "");
+  explicit Properties(std::string name = "");
 
   virtual ~Properties() = default;
 
diff --git a/libminifi/include/provenance/Provenance.h 
b/libminifi/include/provenance/Provenance.h
index 7941b5465..3eab418ae 100644
--- a/libminifi/include/provenance/Provenance.h
+++ b/libminifi/include/provenance/Provenance.h
@@ -36,7 +36,7 @@
 #include "properties/Configure.h"
 #include "Connection.h"
 #include "FlowFileRecord.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 #include "ResourceClaim.h"
 #include "utils/gsl.h"
 #include "utils/Id.h"
diff --git a/libminifi/include/sitetosite/Peer.h 
b/libminifi/include/sitetosite/Peer.h
index 24f525cc9..a35b416f9 100644
--- a/libminifi/include/sitetosite/Peer.h
+++ b/libminifi/include/sitetosite/Peer.h
@@ -27,7 +27,7 @@
 #include <string>
 #include <utility>
 
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 #include "core/Property.h"
 #include "io/BaseStream.h"
 #include "io/ClientSocket.h"
diff --git a/libminifi/include/sitetosite/RawSocketProtocol.h 
b/libminifi/include/sitetosite/RawSocketProtocol.h
index 6bc9c566f..b97e32659 100644
--- a/libminifi/include/sitetosite/RawSocketProtocol.h
+++ b/libminifi/include/sitetosite/RawSocketProtocol.h
@@ -32,7 +32,7 @@
 #include <utility>
 #include <vector>
 
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/Property.h"
diff --git a/libminifi/include/utils/ByteArrayCallback.h 
b/libminifi/include/utils/ByteArrayCallback.h
index 503589300..8cd339d01 100644
--- a/libminifi/include/utils/ByteArrayCallback.h
+++ b/libminifi/include/utils/ByteArrayCallback.h
@@ -24,7 +24,7 @@
 
 #include "concurrentqueue.h"
 #include "FlowFileRecord.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 #include "utils/gsl.h"
 
 namespace org::apache::nifi::minifi::utils {
diff --git a/libminifi/include/utils/FlowFileQueue.h 
b/libminifi/include/utils/FlowFileQueue.h
index 952d3fc12..cebd939d6 100644
--- a/libminifi/include/utils/FlowFileQueue.h
+++ b/libminifi/include/utils/FlowFileQueue.h
@@ -104,7 +104,7 @@ class FlowFileQueue {
   std::optional<LoadTask> load_task_;
   MinMaxHeap<value_type, FlowFilePenaltyExpirationComparator> queue_;
 
-  std::unique_ptr<timeutils::SteadyClock> 
clock_{std::make_unique<timeutils::SteadyClock>()};
+  std::shared_ptr<timeutils::SteadyClock> clock_{timeutils::getClock()};
 
   std::shared_ptr<core::logging::Logger> logger_;
 };
diff --git a/libminifi/include/utils/Hash.h b/libminifi/include/utils/Hash.h
new file mode 100644
index 000000000..0afd33692
--- /dev/null
+++ b/libminifi/include/utils/Hash.h
@@ -0,0 +1,30 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstddef>
+
+namespace org::apache::nifi::minifi::utils {
+
+// from the boost hash_combine docs
+inline size_t hash_combine(size_t seed, size_t new_hash) noexcept {
+  return seed ^ (new_hash + 0x9e3779b9 + (seed << 6U) + (seed >> 2U));
+}
+
+}  // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/include/utils/Id.h b/libminifi/include/utils/Id.h
index 9cff1d76a..e4133e0aa 100644
--- a/libminifi/include/utils/Id.h
+++ b/libminifi/include/utils/Id.h
@@ -31,6 +31,7 @@ class uuid;
 #include "core/logging/Logger.h"
 #include "properties/Properties.h"
 #include "SmallString.h"
+#include "Hash.h"
 
 #define UUID_TIME_IMPL 0
 #define UUID_RANDOM_IMPL 1
@@ -141,10 +142,6 @@ struct hash<org::apache::nifi::minifi::utils::Identifier> {
   size_t operator()(const org::apache::nifi::minifi::utils::Identifier& id) 
const noexcept {
     static_assert(sizeof(org::apache::nifi::minifi::utils::Identifier) % 
sizeof(size_t) == 0);
     constexpr int slices = 
sizeof(org::apache::nifi::minifi::utils::Identifier) / sizeof(size_t);
-    const auto combine = [](size_t& seed, size_t new_hash) {
-      // from the boost hash_combine docs
-      seed ^= new_hash + 0x9e3779b9 + (seed << 6) + (seed >> 2);
-    };
     const auto get_slice = [](const 
org::apache::nifi::minifi::utils::Identifier& id, size_t idx) -> size_t {
       size_t result{};
       memcpy(&result, reinterpret_cast<const unsigned char*>(&id.data_) + idx 
* sizeof(size_t), sizeof(size_t));
@@ -152,7 +149,7 @@ struct hash<org::apache::nifi::minifi::utils::Identifier> {
     };
     size_t hash = get_slice(id, 0);
     for (size_t i = 1; i < slices; ++i) {
-      combine(hash, get_slice(id, i));
+      hash = org::apache::nifi::minifi::utils::hash_combine(hash, 
get_slice(id, i));
     }
     return hash;
   }
diff --git a/libminifi/include/utils/ListingStateManager.h 
b/libminifi/include/utils/ListingStateManager.h
index 0ca25e161..d97e1492e 100644
--- a/libminifi/include/utils/ListingStateManager.h
+++ b/libminifi/include/utils/ListingStateManager.h
@@ -27,7 +27,7 @@
 
 #include "core/CoreComponentState.h"
 #include "core/logging/Logger.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 
 namespace org::apache::nifi::minifi::utils {
 
diff --git a/libminifi/include/utils/TestUtils.h 
b/libminifi/include/utils/TestUtils.h
index 37e1543f3..99b964037 100644
--- a/libminifi/include/utils/TestUtils.h
+++ b/libminifi/include/utils/TestUtils.h
@@ -22,6 +22,7 @@
 #include <fstream>
 #include <memory>
 #include <string>
+#include <unordered_set>
 
 #include "utils/file/FileUtils.h"
 #include "utils/Environment.h"
@@ -60,34 +61,52 @@ Identifier generateUUID() {
   return id_generator->generate();
 }
 
-class ManualClock : public timeutils::Clock {
+class ManualClock : public timeutils::SteadyClock {
  public:
-  [[nodiscard]] std::chrono::milliseconds timeSinceEpoch() const override { 
return time_; }
-  void advance(std::chrono::milliseconds elapsed_time) { time_ += 
elapsed_time; }
+  [[nodiscard]] std::chrono::milliseconds timeSinceEpoch() const override {
+    std::lock_guard lock(mtx_);
+    return time_;
+  }
 
- private:
-  std::chrono::milliseconds time_{0};
-};
+  [[nodiscard]] std::chrono::time_point<std::chrono::steady_clock> now() const 
override {
+    return std::chrono::steady_clock::time_point{timeSinceEpoch()};
+  }
 
-class ManualSteadyClock : public timeutils::SteadyClock {
- public:
-  std::chrono::milliseconds timeSinceEpoch() const override { return time_; }
   void advance(std::chrono::milliseconds elapsed_time) {
     if (elapsed_time.count() < 0) {
       throw std::logic_error("A steady clock can only be advanced forward");
     }
+    std::lock_guard lock(mtx_);
     time_ += elapsed_time;
+    for (auto* cv : cvs_) {
+      cv->notify_all();
+    }
   }
 
-  std::chrono::steady_clock::time_point now() const override {
-    return std::chrono::steady_clock::time_point{time_};
+  bool wait_until(std::condition_variable& cv, std::unique_lock<std::mutex>& 
lck, std::chrono::milliseconds time, const std::function<bool()>& pred) 
override {
+    std::chrono::milliseconds now;
+    {
+      std::unique_lock lock(mtx_);
+      now = time_;
+      cvs_.insert(&cv);
+    }
+    cv.wait_for(lck, time - now, [&] {
+      now = timeSinceEpoch();
+      return now >= time || pred();
+    });
+    {
+      std::unique_lock lock(mtx_);
+      cvs_.erase(&cv);
+    }
+    return pred();
   }
 
  private:
+  mutable std::mutex mtx_;
+  std::unordered_set<std::condition_variable*> cvs_;
   std::chrono::milliseconds time_{0};
 };
 
-
 #ifdef WIN32
 // The tzdata location is set as a global variable in date-tz library
 // We need to set it from from libminifi to effect calls made from libminifi 
(on Windows)
diff --git a/libminifi/include/utils/TimeUtil.h 
b/libminifi/include/utils/TimeUtil.h
index 906d851fe..9e3038a6c 100644
--- a/libminifi/include/utils/TimeUtil.h
+++ b/libminifi/include/utils/TimeUtil.h
@@ -29,6 +29,8 @@
 #include <optional>
 #include <functional>
 #include <algorithm>
+#include <condition_variable>
+#include <memory>
 
 // libc++ doesn't define operator<=> on durations, and apparently the operator 
rewrite rules don't automagically make one
 #if defined(_LIBCPP_VERSION) && _LIBCPP_VERSION <= 14000
@@ -70,6 +72,9 @@ class Clock {
  public:
   virtual ~Clock() = default;
   virtual std::chrono::milliseconds timeSinceEpoch() const = 0;
+  virtual bool wait_until(std::condition_variable& cv, 
std::unique_lock<std::mutex>& lck, std::chrono::milliseconds time, const 
std::function<bool()>& pred) {
+    return cv.wait_for(lck, time - timeSinceEpoch(), pred);
+  }
 };
 
 class SystemClock : public Clock {
@@ -90,6 +95,11 @@ class SteadyClock : public Clock {
   }
 };
 
+std::shared_ptr<SteadyClock> getClock();
+
+// test-only utility to specify what clock to use
+void setClock(std::shared_ptr<SteadyClock> clock);
+
 inline std::string getTimeStr(std::chrono::system_clock::time_point tp) {
   std::ostringstream stream;
   date::to_stream(stream, TIME_FORMAT, 
std::chrono::floor<std::chrono::milliseconds>(tp));
diff --git a/libminifi/include/utils/file/FileSystem.h 
b/libminifi/include/utils/file/FileSystem.h
index a6cafc0cf..cc873accb 100644
--- a/libminifi/include/utils/file/FileSystem.h
+++ b/libminifi/include/utils/file/FileSystem.h
@@ -21,7 +21,7 @@
 #include <optional>
 #include <string>
 #include "utils/crypto/EncryptionProvider.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 
 namespace org {
 namespace apache {
diff --git a/libminifi/include/utils/file/FileUtils.h 
b/libminifi/include/utils/file/FileUtils.h
index fa8f02a64..d1b582840 100644
--- a/libminifi/include/utils/file/FileUtils.h
+++ b/libminifi/include/utils/file/FileUtils.h
@@ -70,7 +70,7 @@
 
 #endif
 
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 #include "utils/StringUtils.h"
 #include "utils/file/PathUtils.h"
 #include "utils/gsl.h"
diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp
index b471bf202..ccd8fcab9 100644
--- a/libminifi/src/Configuration.cpp
+++ b/libminifi/src/Configuration.cpp
@@ -124,6 +124,14 @@ const std::vector<core::ConfigurationProperty> 
Configuration::CONFIGURATION_PROP
   core::ConfigurationProperty{Configuration::nifi_log_logger_root},
   
core::ConfigurationProperty{Configuration::nifi_log_compression_cached_log_max_size,
 gsl::make_not_null(core::StandardValidators::get().DATA_SIZE_VALIDATOR.get())},
   
core::ConfigurationProperty{Configuration::nifi_log_compression_compressed_log_max_size,
 gsl::make_not_null(core::StandardValidators::get().DATA_SIZE_VALIDATOR.get())},
+  core::ConfigurationProperty{Configuration::nifi_log_alert_url},
+  
core::ConfigurationProperty{Configuration::nifi_log_alert_ssl_context_service},
+  core::ConfigurationProperty{Configuration::nifi_log_alert_batch_size},
+  core::ConfigurationProperty{Configuration::nifi_log_alert_flush_period},
+  core::ConfigurationProperty{Configuration::nifi_log_alert_filter},
+  core::ConfigurationProperty{Configuration::nifi_log_alert_rate_limit},
+  core::ConfigurationProperty{Configuration::nifi_log_alert_buffer_limit},
+  core::ConfigurationProperty{Configuration::nifi_log_alert_level},
   core::ConfigurationProperty{Configuration::nifi_asset_directory},
   core::ConfigurationProperty{Configuration::nifi_metrics_publisher_class},
   
core::ConfigurationProperty{Configuration::nifi_metrics_publisher_prometheus_metrics_publisher_port,
 gsl::make_not_null(core::StandardValidators::get().PORT_VALIDATOR.get())},
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 2339e968c..79923a69f 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -376,6 +376,7 @@ int16_t FlowController::start() {
         this->root_->startProcessing(timer_scheduler_, event_scheduler_, 
cron_scheduler_);
       }
       C2Client::initialize(this, this, this);
+      
core::logging::LoggerConfiguration::getConfiguration().initializeAlertSinks(this,
 configuration_);
       running_ = true;
       this->protocol_->start();
       this->provenance_repo_->start();
diff --git a/libminifi/src/core/logging/LoggerConfiguration.cpp 
b/libminifi/src/core/logging/LoggerConfiguration.cpp
index 10a53a036..770fcf6f7 100644
--- a/libminifi/src/core/logging/LoggerConfiguration.cpp
+++ b/libminifi/src/core/logging/LoggerConfiguration.cpp
@@ -36,8 +36,10 @@
 #include "utils/file/FileUtils.h"
 #include "utils/Environment.h"
 #include "core/logging/internal/LogCompressorSink.h"
+#include "core/logging/alert/AlertSink.h"
 #include "utils/Literals.h"
 #include "core/TypedValues.h"
+#include "core/logging/Utils.h"
 
 #include "spdlog/spdlog.h"
 #include "spdlog/sinks/stdout_sinks.h"
@@ -60,26 +62,21 @@ namespace org::apache::nifi::minifi::core::logging {
 
 const char* LoggerConfiguration::spdlog_default_pattern = "[%Y-%m-%d 
%H:%M:%S.%e] [%n] [%l] %v";
 
-namespace {
-std::optional<spdlog::level::level_enum> parse_log_level(const std::string& 
level_name) {
-  if (utils::StringUtils::equalsIgnoreCase(level_name, "trace")) {
-    return spdlog::level::trace;
-  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "debug")) {
-    return spdlog::level::debug;
-  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "info")) {
-    return spdlog::level::info;
-  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "warn")) {
-    return spdlog::level::warn;
-  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "error")) {
-    return spdlog::level::err;
-  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "critical")) {
-    return spdlog::level::critical;
-  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "off")) {
-    return spdlog::level::off;
+namespace internal {
+
+void LoggerNamespace::forEachSink(const std::function<void(const 
std::shared_ptr<spdlog::sinks::sink>&)>& op) const {
+  for (auto& sink : sinks) {
+    op(sink);
+  }
+  for (auto& sink : exported_sinks) {
+    op(sink);
+  }
+  for (auto& [name, child] : children) {
+    child->forEachSink(op);
   }
-  return std::nullopt;
 }
-}  // namespace
+
+}  // namespace internal
 
 std::vector<std::string> LoggerProperties::get_keys_of_type(const std::string 
&type) {
   std::vector<std::string> appenders;
@@ -108,7 +105,13 @@ LoggerConfiguration& 
LoggerConfiguration::getConfiguration() {
 
 void LoggerConfiguration::initialize(const std::shared_ptr<LoggerProperties> 
&logger_properties) {
   std::lock_guard<std::mutex> lock(mutex);
-  root_namespace_ = initialize_namespaces(logger_properties);
+  root_namespace_ = initialize_namespaces(logger_properties, logger_);
+  alert_sinks_.clear();
+  root_namespace_->forEachSink([&] (const 
std::shared_ptr<spdlog::sinks::sink>& sink) {
+    if (auto alert_sink = std::dynamic_pointer_cast<AlertSink>(sink)) {
+      alert_sinks_.insert(std::move(alert_sink));
+    }
+  });
   initializeCompression(lock, logger_properties);
   std::string spdlog_pattern;
   if (!logger_properties->getString("spdlog.pattern", spdlog_pattern)) {
@@ -163,7 +166,7 @@ std::shared_ptr<spdlog::logger> 
LoggerConfiguration::getSpdlogLogger(const std::
   return spdlog::get(name);
 }
 
-std::shared_ptr<internal::LoggerNamespace> 
LoggerConfiguration::initialize_namespaces(const 
std::shared_ptr<LoggerProperties> &logger_properties) {
+std::shared_ptr<internal::LoggerNamespace> 
LoggerConfiguration::initialize_namespaces(const 
std::shared_ptr<LoggerProperties> &logger_properties, const 
std::shared_ptr<Logger> &logger) {
   std::map<std::string, std::shared_ptr<spdlog::sinks::sink>> sink_map = 
logger_properties->initial_sinks();
 
   std::string appender_type = "appender";
@@ -185,6 +188,10 @@ std::shared_ptr<internal::LoggerNamespace> 
LoggerConfiguration::initialize_names
       sink_map[appender_name] = 
std::make_shared<spdlog::sinks::stderr_sink_mt>();
     } else if ("syslog" == appender_type) {
       sink_map[appender_name] = LoggerConfiguration::create_syslog_sink();
+    } else if ("alert" == appender_type) {
+      if (auto sink = AlertSink::create(appender_key, logger_properties, 
logger)) {
+        sink_map[appender_name] = sink;
+      }
     } else {
       sink_map[appender_name] = LoggerConfiguration::create_fallback_sink();
     }
@@ -204,12 +211,16 @@ std::shared_ptr<internal::LoggerNamespace> 
LoggerConfiguration::initialize_names
       std::string level_name = utils::StringUtils::trim(segment);
       if (first) {
         first = false;
-        auto opt_level = parse_log_level(level_name);
+        auto opt_level = utils::parse_log_level(level_name);
         if (opt_level) {
           level = *opt_level;
         }
       } else {
-        sinks.push_back(sink_map[level_name]);
+        if (auto it = sink_map.find(level_name); it != sink_map.end()) {
+          sinks.push_back(it->second);
+        } else {
+          logger->log_error("Couldn't find sink '%s'", level_name);
+        }
       }
     }
     std::shared_ptr<internal::LoggerNamespace> current_namespace = 
root_namespace;
@@ -233,8 +244,8 @@ std::shared_ptr<internal::LoggerNamespace> 
LoggerConfiguration::initialize_names
   return root_namespace;
 }
 
-std::shared_ptr<spdlog::logger> 
LoggerConfiguration::get_logger(std::shared_ptr<Logger> logger, const 
std::shared_ptr<internal::LoggerNamespace> &root_namespace, const std::string 
&name,
-                                                                
std::shared_ptr<spdlog::formatter> formatter, bool remove_if_present) {
+std::shared_ptr<spdlog::logger> LoggerConfiguration::get_logger(const 
std::shared_ptr<Logger> &logger, const 
std::shared_ptr<internal::LoggerNamespace> &root_namespace, const std::string 
&name,
+                                                                const 
std::shared_ptr<spdlog::formatter> &formatter, bool remove_if_present) {
   std::shared_ptr<spdlog::logger> spdlogger = spdlog::get(name);
   if (spdlogger) {
     if (remove_if_present) {
@@ -316,6 +327,13 @@ void LoggerConfiguration::initializeCompression(const 
std::lock_guard<std::mutex
   }
 }
 
+void 
LoggerConfiguration::initializeAlertSinks(core::controller::ControllerServiceProvider*
 controller, const std::shared_ptr<AgentIdentificationProvider>& agent_id) {
+  std::lock_guard guard(mutex);
+  for (auto& sink : alert_sinks_) {
+    sink->initialize(controller, agent_id);
+  }
+}
+
 std::shared_ptr<spdlog::sinks::rotating_file_sink_mt> 
LoggerConfiguration::getRotatingFileSink(const std::string& appender_key, const 
std::shared_ptr<LoggerProperties>& properties) {
   // According to spdlog docs, if two loggers write to the same file, they 
must use the same sink object.
   // Note that some logging configuration changes will not take effect until 
MiNiFi is restarted.
diff --git a/libminifi/src/core/logging/LoggerFactory.cpp 
b/libminifi/src/core/logging/LoggerFactory.cpp
new file mode 100644
index 000000000..b7b7f591b
--- /dev/null
+++ b/libminifi/src/core/logging/LoggerFactory.cpp
@@ -0,0 +1,28 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/LoggerFactory.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org::apache::nifi::minifi::core::logging {
+
+std::shared_ptr<Logger> LoggerFactoryBase::getAliasedLogger(const std::string 
&alias) {
+  return LoggerConfiguration::getConfiguration().getLogger(alias);
+}
+
+}  // namespace org::apache::nifi::minifi::core::logging
diff --git a/libminifi/src/core/logging/Utils.cpp 
b/libminifi/src/core/logging/Utils.cpp
new file mode 100644
index 000000000..68dc2c671
--- /dev/null
+++ b/libminifi/src/core/logging/Utils.cpp
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/Utils.h"
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+std::optional<spdlog::level::level_enum> parse_log_level(const std::string& 
level_name) {
+  if (utils::StringUtils::equalsIgnoreCase(level_name, "trace")) {
+    return spdlog::level::trace;
+  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "debug")) {
+    return spdlog::level::debug;
+  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "info")) {
+    return spdlog::level::info;
+  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "warn")) {
+    return spdlog::level::warn;
+  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "error")) {
+    return spdlog::level::err;
+  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "critical")) {
+    return spdlog::level::critical;
+  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "off")) {
+    return spdlog::level::off;
+  }
+  return std::nullopt;
+}
+
+}  // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/src/core/logging/alert/AlertSink.cpp 
b/libminifi/src/core/logging/alert/AlertSink.cpp
new file mode 100644
index 000000000..726c8b995
--- /dev/null
+++ b/libminifi/src/core/logging/alert/AlertSink.cpp
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/alert/AlertSink.h"
+#include "core/TypedValues.h"
+#include "core/ClassLoader.h"
+#include "utils/HTTPClient.h"
+#include "utils/Hash.h"
+#include "core/logging/Utils.h"
+#include "controllers/SSLContextService.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::core::logging {
+
+AlertSink::AlertSink(Config config, std::shared_ptr<Logger> logger)
+    : config_(std::move(config)),
+      live_logs_(config_.rate_limit),
+      buffer_(config_.buffer_limit, config_.batch_size),
+      logger_(std::move(logger)) {
+  set_level(config_.level);
+  next_flush_ = clock_->timeSinceEpoch() + config_.flush_period;
+  flush_thread_ = std::thread([this] {run();});
+}
+
+std::shared_ptr<AlertSink> AlertSink::create(const std::string& 
prop_name_prefix, const std::shared_ptr<LoggerProperties>& logger_properties, 
std::shared_ptr<Logger> logger) {
+  Config config;
+
+  if (auto url = logger_properties->getString(prop_name_prefix + ".url")) {
+    config.url = url.value();
+  } else {
+    logger->log_info("Missing '%s.url' value, network logging won't be 
available", prop_name_prefix);
+    return {};
+  }
+
+  if (auto filter_str = logger_properties->getString(prop_name_prefix + 
".filter")) {
+    try {
+      config.filter = utils::Regex{filter_str.value()};
+    } catch (const std::regex_error& err) {
+      logger->log_error("Invalid '%s.filter' value, network logging won't be 
available: %s", prop_name_prefix, err.what());
+      return {};
+    }
+  } else {
+    logger->log_error("Missing '%s.filter' value, network logging won't be 
available", prop_name_prefix);
+    return {};
+  }
+
+  auto readPropertyOr = [&] (auto suffix, auto parser, auto fallback) {
+    if (auto prop_str = logger_properties->getString(prop_name_prefix + 
suffix)) {
+      if (auto prop_val = parser(prop_str.value())) {
+        return prop_val.value();
+      }
+      logger->log_error("Invalid '%s' value, using default '%s'", 
prop_name_prefix + suffix, fallback);
+    } else {
+      logger->log_info("Missing '%s' value, using default '%s'", 
prop_name_prefix + suffix, fallback);
+    }
+    return parser(fallback).value();
+  };
+
+  auto datasize_parser = [] (const std::string& str) -> std::optional<int> {
+    int val;
+    if (DataSizeValue::StringToInt(str, val)) {
+      return val;
+    }
+    return {};
+  };
+
+  config.batch_size = readPropertyOr(".batch.size", datasize_parser, "100 KB");
+  config.flush_period = readPropertyOr(".flush.period", 
TimePeriodValue::fromString, "5 s").getMilliseconds();
+  config.rate_limit = readPropertyOr(".rate.limit", 
TimePeriodValue::fromString, "10 min").getMilliseconds();
+  config.buffer_limit = readPropertyOr(".buffer.limit", datasize_parser, "1 
MB");
+  config.level = readPropertyOr(".level", utils::parse_log_level, "trace");
+  config.ssl_service_name = logger_properties->getString(prop_name_prefix + 
".ssl.context.service");
+
+  return std::shared_ptr<AlertSink>(new AlertSink(std::move(config), 
std::move(logger)));
+}
+
+void AlertSink::initialize(core::controller::ControllerServiceProvider* 
controller, std::shared_ptr<AgentIdentificationProvider> agent_id) {
+  auto services = std::make_unique<Services>();
+
+  services->agent_id = std::move(agent_id);
+
+  if (config_.ssl_service_name) {
+    if (!controller) {
+      logger_->log_error("Could not find service '%s': no service provider", 
config_.ssl_service_name.value());
+      return;
+    }
+    if (auto service = 
controller->getControllerService(config_.ssl_service_name.value())) {
+      if (auto ssl_service = 
std::dynamic_pointer_cast<controllers::SSLContextService>(service)) {
+        services->ssl_service = ssl_service;
+      } else {
+        logger_->log_error("Service '%s' is not an SSLContextService", 
config_.ssl_service_name.value());
+        return;
+      }
+    } else {
+      logger_->log_error("Could not find service '%s'", 
config_.ssl_service_name.value());
+      return;
+    }
+  }
+
+  services.reset(services_.exchange(services.release()));
+}
+
+void AlertSink::sink_it_(const spdlog::details::log_msg& msg) {
+  // this method is protected upstream in base_sink by a mutex
+
+  // TODO(adebreceni): revisit this after MINIFICPP-1903
+  std::string payload(msg.payload.data(), msg.payload.size());
+  utils::SMatch match;
+  if (!utils::regexMatch(payload, match, config_.filter)) {
+    return;
+  }
+  size_t hash = 0;
+  for (size_t idx = 1; idx < match.size(); ++idx) {
+    std::string submatch = match[idx].str();
+    hash = utils::hash_combine(hash, std::hash<std::string>{}(submatch));
+  }
+  if (!live_logs_.tryAdd(clock_->timeSinceEpoch(), hash)) {
+    return;
+  }
+
+  spdlog::memory_buf_t formatted;
+  formatter_->format(msg, formatted);
+
+  buffer_.modify([&] (LogBuffer& log_buf) {
+    log_buf.size_ += formatted.size();
+    log_buf.data_.emplace_back(std::string{formatted.data(), 
formatted.size()}, hash);
+  });
+}
+
+void AlertSink::flush_() {}
+
+void AlertSink::run() {
+  while (running_) {
+    {
+      std::unique_lock lock(mtx_);
+      if (clock_->wait_until(cv_, lock, next_flush_, [&] {return !running_;})) 
{
+        break;
+      }
+      next_flush_ = clock_->timeSinceEpoch() + config_.flush_period;
+    }
+    std::unique_ptr<Services> services(services_.exchange(nullptr));
+    if (!services || !running_) {
+      continue;
+    }
+    try {
+      send(*services);
+    } catch (const std::exception& err) {
+      logger_->log_error("Exception while sending logs: %s", err.what());
+    } catch (...) {
+      logger_->log_error("Unknown exception while sending logs");
+    }
+    Services* expected{nullptr};
+    // only restore the services pointer if no initialize set it to something 
else meanwhile
+    if (services_.compare_exchange_strong(expected, services.get())) {
+      (void)services.release();
+    }
+  }
+}
+
+AlertSink::~AlertSink() {
+  {
+    std::lock_guard lock(mtx_);
+    running_ = false;
+    cv_.notify_all();
+  }
+  if (flush_thread_.joinable()) {
+    flush_thread_.join();
+  }
+  delete services_.exchange(nullptr);
+}
+
+void AlertSink::send(Services& services) {
+  LogBuffer logs;
+  buffer_.commit();
+  if (!buffer_.tryDequeue(logs)) {
+    return;
+  }
+
+  auto client = 
core::ClassLoader::getDefaultClassLoader().instantiate<utils::BaseHTTPClient>("HTTPClient",
 "HTTPClient");
+  if (!client) {
+    logger_->log_error("Could not instantiate a HTTPClient object");
+    return;
+  }
+  client->initialize("PUT", config_.url, services.ssl_service);
+
+  rapidjson::Document doc(rapidjson::kObjectType);
+  std::string agent_id = services.agent_id->getAgentIdentifier();
+  doc.AddMember("agentId", rapidjson::Value(agent_id.data(), 
agent_id.length()), doc.GetAllocator());
+  doc.AddMember("alerts", rapidjson::Value(rapidjson::kArrayType), 
doc.GetAllocator());
+  for (const auto& [log, _] : logs.data_) {
+    doc["alerts"].PushBack(rapidjson::Value(log.data(), log.size()), 
doc.GetAllocator());
+  }
+  rapidjson::StringBuffer buffer;
+  rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+  doc.Accept(writer);
+
+  auto data_input = std::make_unique<utils::ByteInputCallback>();
+  auto data_cb = std::make_unique<utils::HTTPUploadCallback>();
+  data_input->write(std::string(buffer.GetString(), buffer.GetSize()));
+  data_cb->ptr = data_input.get();
+  client->setUploadCallback(data_cb.get());
+  client->setContentType("application/json");
+
+  bool req_success = client->submit();
+
+  int64_t resp_code = client->getResponseCode();
+  const bool response_success = 200 <= resp_code && resp_code < 300;
+  const bool client_err = 400 <= resp_code && resp_code < 500;
+  const bool server_err = 500 <= resp_code && resp_code < 600;
+  if (client_err || server_err) {
+    logger_->log_error("Error response code '" "%" PRId64 "' from '%s'", 
resp_code, config_.url);
+  } else if (!response_success) {
+    logger_->log_warn("Non-success response code '" "%" PRId64 "' from '%s'", 
resp_code, config_.url);
+  } else {
+    logger_->log_debug("Response code '" "%" PRId64 "' from '%s'", resp_code, 
config_.url);
+  }
+
+  if (!req_success) {
+    logger_->log_error("Failed to send alert request");
+  }
+}
+
+AlertSink::LogBuffer AlertSink::LogBuffer::allocate(size_t /*size*/) {
+  return {};
+}
+
+AlertSink::LogBuffer AlertSink::LogBuffer::commit() {
+  return std::move(*this);
+}
+
+size_t AlertSink::LogBuffer::size() const {
+  return size_;
+}
+
+bool AlertSink::LiveLogSet::tryAdd(std::chrono::milliseconds now, size_t hash) 
{
+  auto limit = now - lifetime_;
+  while (!timestamped_hashes_.empty() && timestamped_hashes_.front().first < 
limit) {
+    hashes_to_ignore_.erase(timestamped_hashes_.front().second);
+    timestamped_hashes_.pop_front();
+  }
+
+  if (!hashes_to_ignore_.insert(hash).second) {
+    return false;
+  }
+
+  timestamped_hashes_.emplace_back(now, hash);
+  return true;
+}
+
+}  // namespace org::apache::nifi::minifi::core::logging
diff --git a/libminifi/src/properties/Properties.cpp 
b/libminifi/src/properties/Properties.cpp
index b8d127a88..55a930c02 100644
--- a/libminifi/src/properties/Properties.cpp
+++ b/libminifi/src/properties/Properties.cpp
@@ -25,16 +25,11 @@
 #include "core/logging/LoggerConfiguration.h"
 #include "properties/PropertiesFile.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
 
-#define TRACE_BUFFER_SIZE 512
-
-Properties::Properties(const std::string& name)
+Properties::Properties(std::string name)
     : logger_(core::logging::LoggerFactory<Properties>::getLogger()),
-    name_(name) {
+    name_(std::move(name)) {
 }
 
 // Get the config value
@@ -158,7 +153,4 @@ std::map<std::string, std::string> 
Properties::getProperties() const {
   return properties;
 }
 
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi
diff --git a/libminifi/src/utils/Id.cpp b/libminifi/src/utils/Id.cpp
index a62c98a9f..7d283a888 100644
--- a/libminifi/src/utils/Id.cpp
+++ b/libminifi/src/utils/Id.cpp
@@ -32,7 +32,7 @@
 #include <memory>
 #include <string>
 #include <limits>
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
 
 #ifdef WIN32
 #include "Rpc.h"
diff --git a/libminifi/src/utils/TimeUtil.cpp b/libminifi/src/utils/TimeUtil.cpp
new file mode 100644
index 000000000..04df8d6fb
--- /dev/null
+++ b/libminifi/src/utils/TimeUtil.cpp
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/TimeUtil.h"
+
+namespace org::apache::nifi::minifi::utils::timeutils {
+
+static std::mutex global_clock_mtx;
+static std::shared_ptr<SteadyClock> 
global_clock{std::make_shared<SteadyClock>()};
+
+std::shared_ptr<SteadyClock> getClock() {
+  std::lock_guard lock(global_clock_mtx);
+  return global_clock;
+}
+
+// test-only utility to specify what clock to use
+void setClock(std::shared_ptr<SteadyClock> clock) {
+  std::lock_guard lock(global_clock_mtx);
+  global_clock = std::move(clock);
+}
+
+}  // namespace org::apache::nifi::minifi::utils::timeutils
diff --git a/libminifi/test/unit/SwapTestController.h 
b/libminifi/test/unit/SwapTestController.h
index 0a921a754..f6650d2ee 100644
--- a/libminifi/test/unit/SwapTestController.h
+++ b/libminifi/test/unit/SwapTestController.h
@@ -166,10 +166,8 @@ struct VerifiedQueue {
     return result;
   }
 
-  VerifiedQueue(std::shared_ptr<minifi::SwapManager> swap_manager, 
std::unique_ptr<utils::timeutils::SteadyClock> clock)
-    : impl(std::move(swap_manager)) {
-    FlowFileQueueTestAccessor::get_clock_(impl) = std::move(clock);
-  }
+  explicit VerifiedQueue(std::shared_ptr<minifi::SwapManager> swap_manager)
+    : impl(std::move(swap_manager)) {}
 
   utils::FlowFileQueue impl;
   FlowFilePtrVec ref_;
@@ -181,9 +179,9 @@ class SwapTestController : public TestController {
     content_repo_ = 
std::make_shared<core::repository::VolatileContentRepository>();
     flow_repo_ = std::make_shared<SwappingFlowFileTestRepo>();
     flow_repo_->loadComponent(content_repo_);
-    auto clock = std::make_unique<utils::ManualSteadyClock>();
-    clock_ = clock.get();
-    queue_ = 
std::make_shared<VerifiedQueue>(std::static_pointer_cast<minifi::SwapManager>(flow_repo_),
 std::move(clock));
+    clock_ = std::make_shared<utils::ManualClock>();
+    utils::timeutils::setClock(clock_);
+    queue_ = 
std::make_shared<VerifiedQueue>(std::static_pointer_cast<minifi::SwapManager>(flow_repo_));
   }
 
   void setLimits(size_t min_size, size_t target_size, size_t max_size) {
@@ -235,6 +233,5 @@ class SwapTestController : public TestController {
   std::shared_ptr<SwappingFlowFileTestRepo> flow_repo_;
   std::shared_ptr<core::repository::VolatileContentRepository> content_repo_;
   std::shared_ptr<VerifiedQueue> queue_;
-  // owned by the queue_
-  utils::ManualSteadyClock* clock_;
+  std::shared_ptr<utils::ManualClock> clock_;
 };

Reply via email to