This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 922431faf MINIFICPP-1822 Add c2 alert capability
922431faf is described below
commit 922431fafe5e44d92072c99ca0ce70539bb71efc
Author: Adam Debreceni <[email protected]>
AuthorDate: Fri Aug 26 10:49:28 2022 +0200
MINIFICPP-1822 Add c2 alert capability
Closes #1367
Signed-off-by: Marton Szasz <[email protected]>
---
conf/minifi-log.properties | 11 +
extensions/http-curl/tests/AlertTests.cpp | 148 ++++++++++++
extensions/http-curl/tests/CMakeLists.txt | 1 +
extensions/http-curl/tests/ServerAwareHandler.h | 2 +
libminifi/CMakeLists.txt | 2 +-
libminifi/include/FlowControlProtocol.h | 2 +-
libminifi/include/FlowFileRecord.h | 2 +-
libminifi/include/RemoteProcessorGroupPort.h | 2 +-
libminifi/include/SchedulingAgent.h | 2 +-
libminifi/include/ThreadedSchedulingAgent.h | 2 +-
libminifi/include/c2/PayloadSerializer.h | 1 +
.../controllers/LinuxPowerManagementService.h | 2 +-
.../controllers/NetworkPrioritizerService.h | 2 +-
libminifi/include/controllers/SSLContextService.h | 2 +-
.../include/controllers/ThreadManagementService.h | 2 +-
.../controllers/UpdatePolicyControllerService.h | 2 +-
.../AbstractAutoPersistingKeyValueStoreService.h | 2 +-
libminifi/include/core/FlowConfiguration.h | 2 +-
libminifi/include/core/Funnel.h | 2 +-
libminifi/include/core/ProcessContext.h | 2 +-
libminifi/include/core/ProcessContextBuilder.h | 2 +-
libminifi/include/core/ProcessSession.h | 2 +-
.../include/core/ProcessSessionReadCallback.h | 2 +-
libminifi/include/core/Repository.h | 2 +-
.../controller/StandardControllerServiceNode.h | 2 +-
.../controller/StandardControllerServiceProvider.h | 2 +-
.../include/core/logging/LoggerConfiguration.h | 31 +--
.../include/core/logging/LoggerFactory.h | 46 ++--
.../include/core/logging/Utils.h | 38 +--
libminifi/include/core/logging/alert/AlertSink.h | 120 +++++++++
.../reporting/SiteToSiteProvenanceReportingTask.h | 2 +-
.../include/core/repository/FileSystemRepository.h | 2 +-
.../core/repository/VolatileContentRepository.h | 2 +-
libminifi/include/core/state/UpdateController.h | 1 +
libminifi/include/core/yaml/CheckRequiredField.h | 2 +-
libminifi/include/core/yaml/YamlConfiguration.h | 2 +-
libminifi/include/core/yaml/YamlConnectionParser.h | 2 +-
libminifi/include/io/AtomicEntryStream.h | 2 +-
libminifi/include/io/DescriptorStream.h | 2 +-
libminifi/include/io/FileStream.h | 2 +-
libminifi/include/io/tls/SecureDescriptorStream.h | 2 +-
libminifi/include/properties/Configuration.h | 10 +
libminifi/include/properties/Properties.h | 2 +-
libminifi/include/provenance/Provenance.h | 2 +-
libminifi/include/sitetosite/Peer.h | 2 +-
libminifi/include/sitetosite/RawSocketProtocol.h | 2 +-
libminifi/include/utils/ByteArrayCallback.h | 2 +-
libminifi/include/utils/FlowFileQueue.h | 2 +-
libminifi/include/utils/Hash.h | 30 +++
libminifi/include/utils/Id.h | 7 +-
libminifi/include/utils/ListingStateManager.h | 2 +-
libminifi/include/utils/TestUtils.h | 43 +++-
libminifi/include/utils/TimeUtil.h | 10 +
libminifi/include/utils/file/FileSystem.h | 2 +-
libminifi/include/utils/file/FileUtils.h | 2 +-
libminifi/src/Configuration.cpp | 8 +
libminifi/src/FlowController.cpp | 1 +
libminifi/src/core/logging/LoggerConfiguration.cpp | 66 +++--
libminifi/src/core/logging/LoggerFactory.cpp | 28 +++
libminifi/src/core/logging/Utils.cpp | 42 ++++
libminifi/src/core/logging/alert/AlertSink.cpp | 268 +++++++++++++++++++++
libminifi/src/properties/Properties.cpp | 16 +-
libminifi/src/utils/Id.cpp | 2 +-
libminifi/src/utils/TimeUtil.cpp | 36 +++
libminifi/test/unit/SwapTestController.h | 15 +-
65 files changed, 877 insertions(+), 184 deletions(-)
diff --git a/conf/minifi-log.properties b/conf/minifi-log.properties
index aeca6dd4b..db296157b 100644
--- a/conf/minifi-log.properties
+++ b/conf/minifi-log.properties
@@ -38,6 +38,17 @@ appender.rolling.max_file_size=5242880
## The syslog appender will log using syslog(3) on *nix, and to the Windows
Event Log on Windows
#appender.syslog=syslog
+# Alert appender to forward critical logs through HTTP
+#appender.alert1=alert
+#appender.alert1.url=<URL>
+#appender.alert1.filter=<regex pattern to match logs against>
+#appender.alert1.rate.limit=10 min
+#appender.alert1.flush.period=5 s
+#appender.alert1.batch.size=100 KB
+#appender.alert1.buffer.limit=1 MB
+#appender.alert1.level=TRACE
+#appender.alert1.ssl.context.service=<Name of the SSLContextService>
+
logger.root=INFO,rolling
#Logging configurable by namespace
diff --git a/extensions/http-curl/tests/AlertTests.cpp
b/extensions/http-curl/tests/AlertTests.cpp
new file mode 100644
index 000000000..2e6df680a
--- /dev/null
+++ b/extensions/http-curl/tests/AlertTests.cpp
@@ -0,0 +1,148 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#define CATCH_CONFIG_MAIN
+#include "TestBase.h"
+#include "Catch.h"
+#include "ServerAwareHandler.h"
+#include "CivetServer.h"
+#include "TestServer.h"
+#include "HTTPIntegrationBase.h"
+#include "rapidjson/document.h"
+#include "EmptyFlow.h"
+#include "Utils.h"
+#include "TestUtils.h"
+
+class AlertHandler : public ServerAwareHandler {
+ public:
+ explicit AlertHandler(std::string agent_id): agent_id_(std::move(agent_id))
{}
+
+ bool handlePut(CivetServer* , struct mg_connection *conn) override {
+ auto msg = readPayload(conn);
+ rapidjson::Document doc;
+ rapidjson::ParseResult res = doc.Parse(msg.c_str());
+ REQUIRE(static_cast<bool>(res));
+ REQUIRE(doc.IsObject());
+ REQUIRE(doc.HasMember("agentId"));
+ REQUIRE(doc["agentId"].IsString());
+ REQUIRE(doc.HasMember("alerts"));
+ REQUIRE(doc["alerts"].IsArray());
+ REQUIRE(doc["alerts"].Size() > 0);
+ std::string id(doc["agentId"].GetString(),
doc["agentId"].GetStringLength());
+ REQUIRE(id == agent_id_);
+ std::vector<std::string> batch;
+ for (size_t i = 0; i < doc["alerts"].Size(); ++i) {
+ REQUIRE(doc["alerts"][i].IsString());
+ batch.emplace_back(doc["alerts"][i].GetString(),
doc["alerts"][i].GetStringLength());
+ }
+ alerts_.enqueue(std::move(batch));
+ return true;
+ }
+
+ std::string agent_id_;
+ utils::ConditionConcurrentQueue<std::vector<std::string>> alerts_;
+};
+
+class VerifyAlerts : public HTTPIntegrationBase {
+ public:
+ void testSetup() override {}
+
+ void runAssertions() override {
+ verify_();
+ }
+
+ std::function<bool()> verify_;
+};
+
+TEST_CASE("Alert system forwards logs") {
+ auto clock = std::make_shared<utils::ManualClock>();
+ utils::timeutils::setClock(clock);
+
+ TempDirectory dir;
+ auto flow_config_file = std::filesystem::path(dir.getPath()) / "config.yml";
+ std::ofstream(flow_config_file) << empty_flow;
+
+ std::string agent_id = "test-agent-1";
+ VerifyAlerts harness;
+ AlertHandler handler(agent_id);
+ harness.setUrl("http://localhost:0/api/alerts", &handler);
+
harness.getConfiguration()->set(minifi::Configuration::nifi_c2_agent_identifier,
agent_id);
+ harness.getConfiguration()->setHome(dir.getPath());
+
+ auto log_props = std::make_shared<logging::LoggerProperties>();
+ log_props->set("appender.alert1", "alert");
+ log_props->set("appender.alert1.url", harness.getC2RestUrl());
+ log_props->set("appender.alert1.filter", ".*<begin>(.*)<end>.*");
+ log_props->set("appender.alert1.rate.limit", "10 s");
+ log_props->set("appender.alert1.flush.period", "1 s");
+ log_props->set("logger.root", "INFO,alert1");
+ logging::LoggerConfiguration::getConfiguration().initialize(log_props);
+
+ auto verifyLogsArrived = [&] (const std::vector<std::string>& expected) {
+ std::vector<std::string> logs;
+ REQUIRE(handler.alerts_.dequeueWaitFor(logs, 1s));
+ REQUIRE(logs.size() == expected.size());
+ for (size_t idx = 0; idx < expected.size(); ++idx) {
+ bool contains = std::search(logs[idx].begin(), logs[idx].end(),
expected[idx].begin(), expected[idx].end()) != logs[idx].end();
+ REQUIRE(contains);
+ }
+ };
+
+ harness.verify_ = [&] {
+ auto logger = logging::LoggerFactory<minifi::FlowController>::getLogger();
+ // time = 0
+ logger->log_error("not matched");
+ logger->log_error("<begin>one<end>");
+ logger->log_error("not the same but treated so <begin>one<end>");
+ logger->log_error("<begin>two<end>");
+ clock->advance(2s);
+ // time = 2
+ verifyLogsArrived({
+ "<begin>one<end>", "<begin>two<end>"
+ });
+
+ clock->advance(5s);
+ // time = 7
+ // no new logs over HTTP
+
+ logger->log_error("other <begin>one<end>");
+ logger->log_error("new log <begin>three<end>");
+ clock->advance(2s);
+
+ // time = 9
+ verifyLogsArrived({
+ "new log <begin>three<end>"
+ });
+
+ clock->advance(2s);
+ // time = 11
+ logger->log_error("other <begin>one<end>");
+ logger->log_error("new log <begin>three<end>");
+ clock->advance(2s);
+ // time = 13
+
+ verifyLogsArrived({
+ "other <begin>one<end>"
+ });
+
+ return true;
+ };
+
+ harness.run(flow_config_file.string(), dir.getPath());
+}
diff --git a/extensions/http-curl/tests/CMakeLists.txt
b/extensions/http-curl/tests/CMakeLists.txt
index 00827fa5d..108d1e85e 100644
--- a/extensions/http-curl/tests/CMakeLists.txt
+++ b/extensions/http-curl/tests/CMakeLists.txt
@@ -110,3 +110,4 @@ add_test(NAME C2MetricsTest COMMAND C2MetricsTest
"${TEST_RESOURCES}/TestC2Metri
add_test(NAME C2EmptyMetricTest COMMAND C2EmptyMetricTest
"${TEST_RESOURCES}/TestEmpty.yml")
add_test(NAME C2SameProcessorMetrics COMMAND C2SameProcessorMetrics
"${TEST_RESOURCES}/TestSameProcessorMetrics.yml")
add_test(NAME C2DescribeMetricsTest COMMAND C2DescribeMetricsTest
"${TEST_RESOURCES}/TestSameProcessorMetrics.yml")
+add_test(NAME AlertTests COMMAND AlertTests)
diff --git a/extensions/http-curl/tests/ServerAwareHandler.h
b/extensions/http-curl/tests/ServerAwareHandler.h
index 14b263bfc..3ca23d261 100644
--- a/extensions/http-curl/tests/ServerAwareHandler.h
+++ b/extensions/http-curl/tests/ServerAwareHandler.h
@@ -21,6 +21,8 @@
#include <string>
#include <array>
+#include "CivetServer.h"
+
class ServerAwareHandler: public CivetHandler {
protected:
void sleep_for(std::chrono::milliseconds time) {
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 8ec537140..9bcfd218d 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -57,7 +57,7 @@ if (NOT OPENSSL_OFF)
set(TLS_SOURCES "src/utils/tls/*.cpp" "src/io/tls/*.cpp")
endif()
-file(GLOB SOURCES "src/agent/agent_docs.cpp" "src/agent/build_description.cpp"
"src/properties/*.cpp" "src/utils/file/*.cpp" "src/sitetosite/*.cpp"
"src/core/logging/*.cpp" "src/core/logging/internal/*.cpp"
"src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp"
"src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES}
${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp"
"src/controllers/keyvalue/*.cpp" "src/core/*.cpp" "src/core/reposit [...]
+file(GLOB SOURCES "src/agent/agent_docs.cpp" "src/agent/build_description.cpp"
"src/properties/*.cpp" "src/utils/file/*.cpp" "src/sitetosite/*.cpp"
"src/core/logging/*.cpp" "src/core/logging/internal/*.cpp"
"src/core/logging/alert/*.cpp" "src/core/state/*.cpp"
"src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp"
"src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES}
"src/core/controller/*.cpp" "src/controllers/*.cpp"
"src/controllers/keyvalue/*.cpp" "src [...]
# manually add this as it might not yet be present when this executes
list(APPEND SOURCES "${CMAKE_CURRENT_BINARY_DIR}/agent_version.cpp")
diff --git a/libminifi/include/FlowControlProtocol.h
b/libminifi/include/FlowControlProtocol.h
index f0b762c68..b85f16a1e 100644
--- a/libminifi/include/FlowControlProtocol.h
+++ b/libminifi/include/FlowControlProtocol.h
@@ -32,7 +32,7 @@
#include <thread>
#include <vector>
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
#include "core/Property.h"
#include "properties/Configure.h"
#include "utils/file/FileUtils.h"
diff --git a/libminifi/include/FlowFileRecord.h
b/libminifi/include/FlowFileRecord.h
index 5ddd4c520..210fa935e 100644
--- a/libminifi/include/FlowFileRecord.h
+++ b/libminifi/include/FlowFileRecord.h
@@ -34,7 +34,7 @@
#include "io/BaseStream.h"
#include "core/FlowFile.h"
#include "utils/TimeUtil.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
#include "ResourceClaim.h"
#include "Connection.h"
#include "io/OutputStream.h"
diff --git a/libminifi/include/RemoteProcessorGroupPort.h
b/libminifi/include/RemoteProcessorGroupPort.h
index c78413ccb..c717308c9 100644
--- a/libminifi/include/RemoteProcessorGroupPort.h
+++ b/libminifi/include/RemoteProcessorGroupPort.h
@@ -33,7 +33,7 @@
#include "sitetosite/SiteToSiteClient.h"
#include "io/StreamFactory.h"
#include "controllers/SSLContextService.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
#include "utils/Export.h"
namespace org::apache::nifi::minifi {
diff --git a/libminifi/include/SchedulingAgent.h
b/libminifi/include/SchedulingAgent.h
index d3f1d6f20..f02f4773a 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -35,7 +35,7 @@
#include "utils/ThreadPool.h"
#include "utils/BackTrace.h"
#include "core/Core.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
#include "properties/Configure.h"
#include "FlowFileRecord.h"
#include "core/logging/Logger.h"
diff --git a/libminifi/include/ThreadedSchedulingAgent.h
b/libminifi/include/ThreadedSchedulingAgent.h
index 8d488040d..c53249096 100644
--- a/libminifi/include/ThreadedSchedulingAgent.h
+++ b/libminifi/include/ThreadedSchedulingAgent.h
@@ -25,7 +25,7 @@
#include <string>
#include <chrono>
#include "properties/Configure.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
#include "core/Processor.h"
#include "core/Repository.h"
#include "core/ProcessContext.h"
diff --git a/libminifi/include/c2/PayloadSerializer.h
b/libminifi/include/c2/PayloadSerializer.h
index d2fac7bf1..effd297e1 100644
--- a/libminifi/include/c2/PayloadSerializer.h
+++ b/libminifi/include/c2/PayloadSerializer.h
@@ -27,6 +27,7 @@
#include "core/state/Value.h"
#include "c2/C2Protocol.h"
#include "io/BaseStream.h"
+#include "io/BufferStream.h"
#include "utils/gsl.h"
namespace org {
diff --git a/libminifi/include/controllers/LinuxPowerManagementService.h
b/libminifi/include/controllers/LinuxPowerManagementService.h
index ed433b23a..343ddec07 100644
--- a/libminifi/include/controllers/LinuxPowerManagementService.h
+++ b/libminifi/include/controllers/LinuxPowerManagementService.h
@@ -25,7 +25,7 @@
#include "utils/StringUtils.h"
#include "io/validation.h"
#include "core/controller/ControllerService.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
#include "ThreadManagementService.h"
namespace org::apache::nifi::minifi::controllers {
diff --git a/libminifi/include/controllers/NetworkPrioritizerService.h
b/libminifi/include/controllers/NetworkPrioritizerService.h
index a7f049426..5a70f92e7 100644
--- a/libminifi/include/controllers/NetworkPrioritizerService.h
+++ b/libminifi/include/controllers/NetworkPrioritizerService.h
@@ -27,7 +27,7 @@
#include "io/validation.h"
#include "controllers/SSLContextService.h"
#include "core/controller/ControllerService.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
#include "ThreadManagementService.h"
#include "io/NetworkPrioritizer.h"
#include "utils/Export.h"
diff --git a/libminifi/include/controllers/SSLContextService.h
b/libminifi/include/controllers/SSLContextService.h
index ae88013c3..aa165ab12 100644
--- a/libminifi/include/controllers/SSLContextService.h
+++ b/libminifi/include/controllers/SSLContextService.h
@@ -38,7 +38,7 @@
#include "utils/tls/ExtendedKeyUsage.h"
#include "io/validation.h"
#include "../core/controller/ControllerService.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
#include "utils/Export.h"
#include "utils/tls/CertificateUtils.h"
diff --git a/libminifi/include/controllers/ThreadManagementService.h
b/libminifi/include/controllers/ThreadManagementService.h
index 7af8e744c..b61d77234 100644
--- a/libminifi/include/controllers/ThreadManagementService.h
+++ b/libminifi/include/controllers/ThreadManagementService.h
@@ -24,7 +24,7 @@
#include "utils/StringUtils.h"
#include "io/validation.h"
#include "core/controller/ControllerService.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
namespace org {
namespace apache {
diff --git a/libminifi/include/controllers/UpdatePolicyControllerService.h
b/libminifi/include/controllers/UpdatePolicyControllerService.h
index 15bd87972..2cc68b1f3 100644
--- a/libminifi/include/controllers/UpdatePolicyControllerService.h
+++ b/libminifi/include/controllers/UpdatePolicyControllerService.h
@@ -23,7 +23,7 @@
#include "utils/StringUtils.h"
#include "io/validation.h"
#include "core/controller/ControllerService.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
#include "core/state/UpdatePolicy.h"
#include "utils/Export.h"
diff --git
a/libminifi/include/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h
b/libminifi/include/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h
index 3faaf3cc5..46e564df6 100644
---
a/libminifi/include/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h
+++
b/libminifi/include/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h
@@ -26,7 +26,7 @@
#include "core/Core.h"
#include "properties/Configure.h"
#include "core/logging/Logger.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
#include "utils/Export.h"
namespace org::apache::nifi::minifi::controllers {
diff --git a/libminifi/include/core/FlowConfiguration.h
b/libminifi/include/core/FlowConfiguration.h
index e8239034d..1c2be2e79 100644
--- a/libminifi/include/core/FlowConfiguration.h
+++ b/libminifi/include/core/FlowConfiguration.h
@@ -33,7 +33,7 @@
#include "core/reporting/SiteToSiteProvenanceReportingTask.h"
#include "core/Processor.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/ProcessGroup.h"
diff --git a/libminifi/include/core/Funnel.h b/libminifi/include/core/Funnel.h
index baae6e8a8..e7783d49d 100644
--- a/libminifi/include/core/Funnel.h
+++ b/libminifi/include/core/Funnel.h
@@ -20,7 +20,7 @@
#include <string>
#include <memory>
-#include "logging/LoggerConfiguration.h"
+#include "logging/LoggerFactory.h"
#include "Processor.h"
namespace org::apache::nifi::minifi::core {
diff --git a/libminifi/include/core/ProcessContext.h
b/libminifi/include/core/ProcessContext.h
index fc7fb25d3..2f4c23e5c 100644
--- a/libminifi/include/core/ProcessContext.h
+++ b/libminifi/include/core/ProcessContext.h
@@ -33,7 +33,7 @@
#include "core/repository/FileSystemRepository.h"
#include "core/controller/ControllerServiceProvider.h"
#include "core/controller/ControllerServiceLookup.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
#include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h"
#include "ProcessorNode.h"
#include "core/Repository.h"
diff --git a/libminifi/include/core/ProcessContextBuilder.h
b/libminifi/include/core/ProcessContextBuilder.h
index 7ac0f616d..80260860b 100644
--- a/libminifi/include/core/ProcessContextBuilder.h
+++ b/libminifi/include/core/ProcessContextBuilder.h
@@ -32,7 +32,7 @@
#include "core/repository/FileSystemRepository.h"
#include "core/controller/ControllerServiceProvider.h"
#include "core/controller/ControllerServiceLookup.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
#include "ProcessContext.h"
#include "ProcessorNode.h"
#include "core/Repository.h"
diff --git a/libminifi/include/core/ProcessSession.h
b/libminifi/include/core/ProcessSession.h
index 9e0a48263..29054e008 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -31,7 +31,7 @@
#include "ProcessContext.h"
#include "FlowFileRecord.h"
#include "Exception.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
#include "core/Deprecated.h"
#include "FlowFile.h"
#include "WeakReference.h"
diff --git a/libminifi/include/core/ProcessSessionReadCallback.h
b/libminifi/include/core/ProcessSessionReadCallback.h
index 19a57c8c5..a89003b90 100644
--- a/libminifi/include/core/ProcessSessionReadCallback.h
+++ b/libminifi/include/core/ProcessSessionReadCallback.h
@@ -23,7 +23,7 @@
#include <memory>
#include <string>
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
#include "io/BaseStream.h"
#include "FlowFileRecord.h"
diff --git a/libminifi/include/core/Repository.h
b/libminifi/include/core/Repository.h
index afe3e5750..5e265d091 100644
--- a/libminifi/include/core/Repository.h
+++ b/libminifi/include/core/Repository.h
@@ -34,7 +34,7 @@
#include "core/ContentRepository.h"
#include "core/SerializableComponent.h"
#include "properties/Configure.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
#include "core/Property.h"
#include "ResourceClaim.h"
#include "utils/TimeUtil.h"
diff --git a/libminifi/include/core/controller/StandardControllerServiceNode.h
b/libminifi/include/core/controller/StandardControllerServiceNode.h
index 90c693a65..df65ef91a 100644
--- a/libminifi/include/core/controller/StandardControllerServiceNode.h
+++ b/libminifi/include/core/controller/StandardControllerServiceNode.h
@@ -22,7 +22,7 @@
#include "core/Core.h"
#include "ControllerServiceNode.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
#include "core/ProcessGroup.h"
namespace org::apache::nifi::minifi::core::controller {
diff --git
a/libminifi/include/core/controller/StandardControllerServiceProvider.h
b/libminifi/include/core/controller/StandardControllerServiceProvider.h
index c9402510b..92b12890c 100644
--- a/libminifi/include/core/controller/StandardControllerServiceProvider.h
+++ b/libminifi/include/core/controller/StandardControllerServiceProvider.h
@@ -31,7 +31,7 @@
#include "ControllerServiceNode.h"
#include "StandardControllerServiceNode.h"
#include "ControllerServiceProvider.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
namespace org {
namespace apache {
diff --git a/libminifi/include/core/logging/LoggerConfiguration.h
b/libminifi/include/core/logging/LoggerConfiguration.h
index 7b314c026..b9c9197a5 100644
--- a/libminifi/include/core/logging/LoggerConfiguration.h
+++ b/libminifi/include/core/logging/LoggerConfiguration.h
@@ -25,6 +25,7 @@
#include <map>
#include <mutex>
#include <string>
+#include <unordered_set>
#include "spdlog/common.h"
#include "spdlog/sinks/rotating_file_sink.h"
@@ -37,6 +38,8 @@
#include "core/logging/Logger.h"
#include "LoggerProperties.h"
#include "internal/CompressionManager.h"
+#include "core/logging/LoggerFactory.h"
+#include "alert/AlertSink.h"
class LoggerTestAccessor;
@@ -53,6 +56,8 @@ struct LoggerNamespace {
std::vector<std::shared_ptr<spdlog::sinks::sink>> exported_sinks;
std::map<std::string, std::shared_ptr<LoggerNamespace>> children;
+ void forEachSink(const std::function<void(const
std::shared_ptr<spdlog::sinks::sink>&)>& op) const;
+
LoggerNamespace()
: level(spdlog::level::off),
has_level(false),
@@ -96,6 +101,8 @@ class LoggerConfiguration {
return getCompressedLog(std::chrono::milliseconds{0}, flush);
}
+ void initializeAlertSinks(core::controller::ControllerServiceProvider*
controller, const std::shared_ptr<AgentIdentificationProvider>& agent_id);
+
template<class Rep, class Period>
static std::unique_ptr<io::InputStream> getCompressedLog(const
std::chrono::duration<Rep, Period>& time, bool flush = false) {
return getConfiguration().compression_manager_.getCompressedLog(time,
flush);
@@ -109,9 +116,9 @@ class LoggerConfiguration {
static const char *spdlog_default_pattern;
protected:
- static std::shared_ptr<internal::LoggerNamespace>
initialize_namespaces(const std::shared_ptr<LoggerProperties>
&logger_properties);
- static std::shared_ptr<spdlog::logger> get_logger(std::shared_ptr<Logger>
logger, const std::shared_ptr<internal::LoggerNamespace> &root_namespace, const
std::string &name,
-
std::shared_ptr<spdlog::formatter> formatter, bool remove_if_present = false);
+ static std::shared_ptr<internal::LoggerNamespace>
initialize_namespaces(const std::shared_ptr<LoggerProperties>
&logger_properties, const std::shared_ptr<Logger> &logger = {});
+ static std::shared_ptr<spdlog::logger> get_logger(const
std::shared_ptr<Logger> &logger, const
std::shared_ptr<internal::LoggerNamespace> &root_namespace, const std::string
&name,
+ const
std::shared_ptr<spdlog::formatter> &formatter, bool remove_if_present = false);
private:
std::shared_ptr<Logger> getLogger(const std::string& name, const
std::lock_guard<std::mutex>& lock);
@@ -149,24 +156,8 @@ class LoggerConfiguration {
std::mutex mutex;
std::shared_ptr<LoggerImpl> logger_ = nullptr;
std::shared_ptr<LoggerControl> controller_;
+ std::unordered_set<std::shared_ptr<AlertSink>> alert_sinks_;
bool shorten_names_;
};
-template<typename T>
-class LoggerFactory {
- public:
- /**
- * Gets an initialized logger for the template class.
- */
- static std::shared_ptr<Logger> getLogger() {
- static std::shared_ptr<Logger> logger =
LoggerConfiguration::getConfiguration().getLogger(core::getClassName<T>());
- return logger;
- }
-
- static std::shared_ptr<Logger> getAliasedLogger(const std::string &alias) {
- std::shared_ptr<Logger> logger =
LoggerConfiguration::getConfiguration().getLogger(alias);
- return logger;
- }
-};
-
} // namespace org::apache::nifi::minifi::core::logging
diff --git a/extensions/http-curl/tests/ServerAwareHandler.h
b/libminifi/include/core/logging/LoggerFactory.h
similarity index 51%
copy from extensions/http-curl/tests/ServerAwareHandler.h
copy to libminifi/include/core/logging/LoggerFactory.h
index 14b263bfc..104d9437a 100644
--- a/extensions/http-curl/tests/ServerAwareHandler.h
+++ b/libminifi/include/core/logging/LoggerFactory.h
@@ -19,38 +19,28 @@
#pragma once
#include <string>
-#include <array>
+#include <memory>
-class ServerAwareHandler: public CivetHandler {
- protected:
- void sleep_for(std::chrono::milliseconds time) {
- std::unique_lock<std::mutex> lock(mutex_);
- stop_signal_.wait_for(lock, time, [&] {return terminate_.load();});
- }
-
- bool isServerRunning() const {
- return !terminate_.load();
- }
+#include "core/logging/Logger.h"
+#include "core/Core.h"
- virtual std::string readPayload(struct mg_connection* conn) {
- std::string response;
- int readBytes;
+namespace org::apache::nifi::minifi::core::logging {
- std::array<char, 1024> buffer;
- while ((readBytes = mg_read(conn, buffer.data(), buffer.size())) > 0) {
- response.append(buffer.data(), readBytes);
- }
- return response;
- }
+class LoggerFactoryBase {
+ public:
+ static std::shared_ptr<Logger> getAliasedLogger(const std::string &alias);
+};
+template<typename T>
+class LoggerFactory : public LoggerFactoryBase {
public:
- void stop() {
- terminate_ = true;
- stop_signal_.notify_all();
+ /**
+ * Gets an initialized logger for the template class.
+ */
+ static std::shared_ptr<Logger> getLogger() {
+ static std::shared_ptr<Logger> logger =
getAliasedLogger(core::getClassName<T>());
+ return logger;
}
-
- private:
- std::mutex mutex_;
- std::condition_variable stop_signal_;
- std::atomic_bool terminate_{false};
};
+
+} // namespace org::apache::nifi::minifi::core::logging
diff --git a/extensions/http-curl/tests/ServerAwareHandler.h
b/libminifi/include/core/logging/Utils.h
similarity index 50%
copy from extensions/http-curl/tests/ServerAwareHandler.h
copy to libminifi/include/core/logging/Utils.h
index 14b263bfc..5f4acf2d6 100644
--- a/extensions/http-curl/tests/ServerAwareHandler.h
+++ b/libminifi/include/core/logging/Utils.h
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -18,39 +17,12 @@
#pragma once
+#include <optional>
#include <string>
-#include <array>
-
-class ServerAwareHandler: public CivetHandler {
- protected:
- void sleep_for(std::chrono::milliseconds time) {
- std::unique_lock<std::mutex> lock(mutex_);
- stop_signal_.wait_for(lock, time, [&] {return terminate_.load();});
- }
-
- bool isServerRunning() const {
- return !terminate_.load();
- }
-
- virtual std::string readPayload(struct mg_connection* conn) {
- std::string response;
- int readBytes;
+#include "spdlog/common.h"
- std::array<char, 1024> buffer;
- while ((readBytes = mg_read(conn, buffer.data(), buffer.size())) > 0) {
- response.append(buffer.data(), readBytes);
- }
- return response;
- }
+namespace org::apache::nifi::minifi::utils {
- public:
- void stop() {
- terminate_ = true;
- stop_signal_.notify_all();
- }
+std::optional<spdlog::level::level_enum> parse_log_level(const std::string&
level_name);
- private:
- std::mutex mutex_;
- std::condition_variable stop_signal_;
- std::atomic_bool terminate_{false};
-};
+} // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/include/core/logging/alert/AlertSink.h
b/libminifi/include/core/logging/alert/AlertSink.h
new file mode 100644
index 000000000..6286a686c
--- /dev/null
+++ b/libminifi/include/core/logging/alert/AlertSink.h
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <deque>
+#include <mutex>
+#include <unordered_set>
+#include <regex>
+#include <utility>
+#include <string>
+#include <memory>
+
+#include "core/controller/ControllerServiceProvider.h"
+#include "core/logging/LoggerProperties.h"
+#include "utils/ThreadPool.h"
+#include "utils/StagingQueue.h"
+#include "utils/RegexUtils.h"
+#include "properties/Configure.h"
+#include "spdlog/sinks/base_sink.h"
+
+namespace org::apache::nifi::minifi::controllers {
+class SSLContextService;
+} // namespace org::apache::nifi::minifi::controllers
+
+namespace org::apache::nifi::minifi::core::logging {
+
+class AlertSink : public spdlog::sinks::base_sink<std::mutex> {
+ public:
+ AlertSink(const AlertSink&) = delete;
+ AlertSink(AlertSink&&) = delete;
+ AlertSink& operator=(const AlertSink&&) = delete;
+ AlertSink& operator=(AlertSink&&) = delete;
+
+ static std::shared_ptr<AlertSink> create(const std::string&
prop_name_prefix, const std::shared_ptr<LoggerProperties>& logger_properties,
std::shared_ptr<Logger> logger);
+
+ void initialize(core::controller::ControllerServiceProvider* controller,
std::shared_ptr<AgentIdentificationProvider> agent_id);
+
+ ~AlertSink() override;
+
+ private:
+ struct Config {
+ std::string url;
+ std::optional<std::string> ssl_service_name;
+ int batch_size;
+ std::chrono::milliseconds flush_period;
+ std::chrono::milliseconds rate_limit;
+ int buffer_limit;
+ utils::Regex filter;
+ spdlog::level::level_enum level;
+ };
+
+ struct Services {
+ std::shared_ptr<controllers::SSLContextService> ssl_service;
+ std::shared_ptr<AgentIdentificationProvider> agent_id;
+ };
+
+ class LogBuffer {
+ friend class AlertSink;
+ public:
+ static LogBuffer allocate(size_t size);
+ LogBuffer commit();
+ [[nodiscard]]
+ size_t size() const;
+ private:
+ size_t size_{0};
+ std::deque<std::pair<std::string, size_t>> data_;
+ };
+
+ class LiveLogSet {
+ using Hash = size_t;
+ const std::chrono::milliseconds lifetime_{};
+ std::unordered_set<Hash> hashes_to_ignore_;
+ std::deque<std::pair<std::chrono::milliseconds, Hash>> timestamped_hashes_;
+ public:
+ explicit LiveLogSet(std::chrono::milliseconds lifetime):
lifetime_(lifetime) {}
+
+ bool tryAdd(std::chrono::milliseconds now, Hash hash);
+ };
+
+ AlertSink(Config config, std::shared_ptr<Logger> logger);
+
+ void run();
+ void send(Services& services);
+
+ void sink_it_(const spdlog::details::log_msg& msg) override;
+ void flush_() override;
+
+ Config config_;
+ LiveLogSet live_logs_;
+
+ std::atomic_bool running_{true};
+ std::mutex mtx_;
+ std::chrono::milliseconds next_flush_;
+ std::condition_variable cv_;
+ std::thread flush_thread_;
+
+ utils::StagingQueue<LogBuffer> buffer_;
+
+ std::shared_ptr<utils::timeutils::Clock> clock_ =
utils::timeutils::getClock();
+ std::atomic<gsl::owner<Services*>> services_{nullptr};
+
+ std::shared_ptr<Logger> logger_;
+};
+
+} // namespace org::apache::nifi::minifi::core::logging
diff --git
a/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h
b/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h
index e9a1083da..f1c3e31bd 100644
--- a/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h
+++ b/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h
@@ -27,7 +27,7 @@
#include "core/ProcessSession.h"
#include "RemoteProcessorGroupPort.h"
#include "io/StreamFactory.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
namespace org::apache::nifi::minifi::core::reporting {
diff --git a/libminifi/include/core/repository/FileSystemRepository.h
b/libminifi/include/core/repository/FileSystemRepository.h
index c2b8c7505..01e231c2a 100644
--- a/libminifi/include/core/repository/FileSystemRepository.h
+++ b/libminifi/include/core/repository/FileSystemRepository.h
@@ -24,7 +24,7 @@
#include "core/Core.h"
#include "../ContentRepository.h"
#include "properties/Configure.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
namespace org {
namespace apache {
namespace nifi {
diff --git a/libminifi/include/core/repository/VolatileContentRepository.h
b/libminifi/include/core/repository/VolatileContentRepository.h
index 453b1d1ff..52122ebad 100644
--- a/libminifi/include/core/repository/VolatileContentRepository.h
+++ b/libminifi/include/core/repository/VolatileContentRepository.h
@@ -29,7 +29,7 @@
#include "core/repository/VolatileRepository.h"
#include "properties/Configure.h"
#include "core/Connectable.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
#include "utils/GeneralUtils.h"
namespace org {
diff --git a/libminifi/include/core/state/UpdateController.h
b/libminifi/include/core/state/UpdateController.h
index 496b523a2..4fc93883f 100644
--- a/libminifi/include/core/state/UpdateController.h
+++ b/libminifi/include/core/state/UpdateController.h
@@ -25,6 +25,7 @@
#include <map>
#include "utils/ThreadPool.h"
#include "utils/BackTrace.h"
+#include "io/InputStream.h"
namespace org {
namespace apache {
diff --git a/libminifi/include/core/yaml/CheckRequiredField.h
b/libminifi/include/core/yaml/CheckRequiredField.h
index 0967a723b..2f0e4b0eb 100644
--- a/libminifi/include/core/yaml/CheckRequiredField.h
+++ b/libminifi/include/core/yaml/CheckRequiredField.h
@@ -21,7 +21,7 @@
#include <memory>
#include <vector>
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
#include "yaml-cpp/yaml.h"
namespace org {
diff --git a/libminifi/include/core/yaml/YamlConfiguration.h
b/libminifi/include/core/yaml/YamlConfiguration.h
index bd9e01e20..18ff1ee19 100644
--- a/libminifi/include/core/yaml/YamlConfiguration.h
+++ b/libminifi/include/core/yaml/YamlConfiguration.h
@@ -24,7 +24,7 @@
#include <unordered_set>
#include "core/FlowConfiguration.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
#include "core/ProcessorConfig.h"
#include "Exception.h"
#include "io/StreamFactory.h"
diff --git a/libminifi/include/core/yaml/YamlConnectionParser.h
b/libminifi/include/core/yaml/YamlConnectionParser.h
index 6c906eabe..e1c544d77 100644
--- a/libminifi/include/core/yaml/YamlConnectionParser.h
+++ b/libminifi/include/core/yaml/YamlConnectionParser.h
@@ -22,7 +22,7 @@
#include <string>
#include "core/ProcessGroup.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
#include "yaml-cpp/yaml.h"
#include "utils/gsl.h"
diff --git a/libminifi/include/io/AtomicEntryStream.h
b/libminifi/include/io/AtomicEntryStream.h
index f4f23c0f1..dc3dc0581 100644
--- a/libminifi/include/io/AtomicEntryStream.h
+++ b/libminifi/include/io/AtomicEntryStream.h
@@ -27,7 +27,7 @@
#include "BaseStream.h"
#include "core/repository/AtomicRepoEntries.h"
#include "Exception.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
#include "utils/gsl.h"
namespace org {
diff --git a/libminifi/include/io/DescriptorStream.h
b/libminifi/include/io/DescriptorStream.h
index 60eb81ca8..e0e9bb224 100644
--- a/libminifi/include/io/DescriptorStream.h
+++ b/libminifi/include/io/DescriptorStream.h
@@ -25,7 +25,7 @@
#include <string>
#include "EndianCheck.h"
#include "BaseStream.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
namespace org {
namespace apache {
diff --git a/libminifi/include/io/FileStream.h
b/libminifi/include/io/FileStream.h
index da8f2263d..754d3ab59 100644
--- a/libminifi/include/io/FileStream.h
+++ b/libminifi/include/io/FileStream.h
@@ -23,7 +23,7 @@
#include <fstream>
#include <string>
#include "BaseStream.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
namespace org {
namespace apache {
diff --git a/libminifi/include/io/tls/SecureDescriptorStream.h
b/libminifi/include/io/tls/SecureDescriptorStream.h
index fbef71de9..eb2dd8ddd 100644
--- a/libminifi/include/io/tls/SecureDescriptorStream.h
+++ b/libminifi/include/io/tls/SecureDescriptorStream.h
@@ -32,7 +32,7 @@
#include <unistd.h>
#endif
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
#include "io/BaseStream.h"
#include "io/EndianCheck.h"
diff --git a/libminifi/include/properties/Configuration.h
b/libminifi/include/properties/Configuration.h
index c4c6abd9b..906bdf0c2 100644
--- a/libminifi/include/properties/Configuration.h
+++ b/libminifi/include/properties/Configuration.h
@@ -154,6 +154,16 @@ class Configuration : public Properties {
static constexpr const char *nifi_log_compression_cached_log_max_size =
"nifi.log.compression.cached.log.max.size";
static constexpr const char *nifi_log_compression_compressed_log_max_size =
"nifi.log.compression.compressed.log.max.size";
+ // alert options
+ static constexpr const char *nifi_log_alert_url = "nifi.log.alert.url";
+ static constexpr const char *nifi_log_alert_ssl_context_service =
"nifi.log.alert.ssl.context.service";
+ static constexpr const char *nifi_log_alert_batch_size =
"nifi.log.alert.batch.size";
+ static constexpr const char *nifi_log_alert_flush_period =
"nifi.log.alert.flush.period";
+ static constexpr const char *nifi_log_alert_filter = "nifi.log.alert.filter";
+ static constexpr const char *nifi_log_alert_rate_limit =
"nifi.log.alert.rate.limit";
+ static constexpr const char *nifi_log_alert_buffer_limit =
"nifi.log.alert.buffer.limit";
+ static constexpr const char *nifi_log_alert_level = "nifi.log.alert.level";
+
static constexpr const char *nifi_asset_directory = "nifi.asset.directory";
// Metrics publisher options
diff --git a/libminifi/include/properties/Properties.h
b/libminifi/include/properties/Properties.h
index 2a8325d70..ca227d145 100644
--- a/libminifi/include/properties/Properties.h
+++ b/libminifi/include/properties/Properties.h
@@ -49,7 +49,7 @@ class Properties {
};
public:
- explicit Properties(const std::string& name = "");
+ explicit Properties(std::string name = "");
virtual ~Properties() = default;
diff --git a/libminifi/include/provenance/Provenance.h
b/libminifi/include/provenance/Provenance.h
index 7941b5465..3eab418ae 100644
--- a/libminifi/include/provenance/Provenance.h
+++ b/libminifi/include/provenance/Provenance.h
@@ -36,7 +36,7 @@
#include "properties/Configure.h"
#include "Connection.h"
#include "FlowFileRecord.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
#include "ResourceClaim.h"
#include "utils/gsl.h"
#include "utils/Id.h"
diff --git a/libminifi/include/sitetosite/Peer.h
b/libminifi/include/sitetosite/Peer.h
index 24f525cc9..a35b416f9 100644
--- a/libminifi/include/sitetosite/Peer.h
+++ b/libminifi/include/sitetosite/Peer.h
@@ -27,7 +27,7 @@
#include <string>
#include <utility>
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
#include "core/Property.h"
#include "io/BaseStream.h"
#include "io/ClientSocket.h"
diff --git a/libminifi/include/sitetosite/RawSocketProtocol.h
b/libminifi/include/sitetosite/RawSocketProtocol.h
index 6bc9c566f..b97e32659 100644
--- a/libminifi/include/sitetosite/RawSocketProtocol.h
+++ b/libminifi/include/sitetosite/RawSocketProtocol.h
@@ -32,7 +32,7 @@
#include <utility>
#include <vector>
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/Property.h"
diff --git a/libminifi/include/utils/ByteArrayCallback.h
b/libminifi/include/utils/ByteArrayCallback.h
index 503589300..8cd339d01 100644
--- a/libminifi/include/utils/ByteArrayCallback.h
+++ b/libminifi/include/utils/ByteArrayCallback.h
@@ -24,7 +24,7 @@
#include "concurrentqueue.h"
#include "FlowFileRecord.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
#include "utils/gsl.h"
namespace org::apache::nifi::minifi::utils {
diff --git a/libminifi/include/utils/FlowFileQueue.h
b/libminifi/include/utils/FlowFileQueue.h
index 952d3fc12..cebd939d6 100644
--- a/libminifi/include/utils/FlowFileQueue.h
+++ b/libminifi/include/utils/FlowFileQueue.h
@@ -104,7 +104,7 @@ class FlowFileQueue {
std::optional<LoadTask> load_task_;
MinMaxHeap<value_type, FlowFilePenaltyExpirationComparator> queue_;
- std::unique_ptr<timeutils::SteadyClock>
clock_{std::make_unique<timeutils::SteadyClock>()};
+ std::shared_ptr<timeutils::SteadyClock> clock_{timeutils::getClock()};
std::shared_ptr<core::logging::Logger> logger_;
};
diff --git a/libminifi/include/utils/Hash.h b/libminifi/include/utils/Hash.h
new file mode 100644
index 000000000..0afd33692
--- /dev/null
+++ b/libminifi/include/utils/Hash.h
@@ -0,0 +1,30 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstddef>
+
+namespace org::apache::nifi::minifi::utils {
+
+// from the boost hash_combine docs
+inline size_t hash_combine(size_t seed, size_t new_hash) noexcept {
+ return seed ^ (new_hash + 0x9e3779b9 + (seed << 6U) + (seed >> 2U));
+}
+
+} // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/include/utils/Id.h b/libminifi/include/utils/Id.h
index 9cff1d76a..e4133e0aa 100644
--- a/libminifi/include/utils/Id.h
+++ b/libminifi/include/utils/Id.h
@@ -31,6 +31,7 @@ class uuid;
#include "core/logging/Logger.h"
#include "properties/Properties.h"
#include "SmallString.h"
+#include "Hash.h"
#define UUID_TIME_IMPL 0
#define UUID_RANDOM_IMPL 1
@@ -141,10 +142,6 @@ struct hash<org::apache::nifi::minifi::utils::Identifier> {
size_t operator()(const org::apache::nifi::minifi::utils::Identifier& id)
const noexcept {
static_assert(sizeof(org::apache::nifi::minifi::utils::Identifier) %
sizeof(size_t) == 0);
constexpr int slices =
sizeof(org::apache::nifi::minifi::utils::Identifier) / sizeof(size_t);
- const auto combine = [](size_t& seed, size_t new_hash) {
- // from the boost hash_combine docs
- seed ^= new_hash + 0x9e3779b9 + (seed << 6) + (seed >> 2);
- };
const auto get_slice = [](const
org::apache::nifi::minifi::utils::Identifier& id, size_t idx) -> size_t {
size_t result{};
memcpy(&result, reinterpret_cast<const unsigned char*>(&id.data_) + idx
* sizeof(size_t), sizeof(size_t));
@@ -152,7 +149,7 @@ struct hash<org::apache::nifi::minifi::utils::Identifier> {
};
size_t hash = get_slice(id, 0);
for (size_t i = 1; i < slices; ++i) {
- combine(hash, get_slice(id, i));
+ hash = org::apache::nifi::minifi::utils::hash_combine(hash,
get_slice(id, i));
}
return hash;
}
diff --git a/libminifi/include/utils/ListingStateManager.h
b/libminifi/include/utils/ListingStateManager.h
index 0ca25e161..d97e1492e 100644
--- a/libminifi/include/utils/ListingStateManager.h
+++ b/libminifi/include/utils/ListingStateManager.h
@@ -27,7 +27,7 @@
#include "core/CoreComponentState.h"
#include "core/logging/Logger.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
namespace org::apache::nifi::minifi::utils {
diff --git a/libminifi/include/utils/TestUtils.h
b/libminifi/include/utils/TestUtils.h
index 37e1543f3..99b964037 100644
--- a/libminifi/include/utils/TestUtils.h
+++ b/libminifi/include/utils/TestUtils.h
@@ -22,6 +22,7 @@
#include <fstream>
#include <memory>
#include <string>
+#include <unordered_set>
#include "utils/file/FileUtils.h"
#include "utils/Environment.h"
@@ -60,34 +61,52 @@ Identifier generateUUID() {
return id_generator->generate();
}
-class ManualClock : public timeutils::Clock {
+class ManualClock : public timeutils::SteadyClock {
public:
- [[nodiscard]] std::chrono::milliseconds timeSinceEpoch() const override {
return time_; }
- void advance(std::chrono::milliseconds elapsed_time) { time_ +=
elapsed_time; }
+ [[nodiscard]] std::chrono::milliseconds timeSinceEpoch() const override {
+ std::lock_guard lock(mtx_);
+ return time_;
+ }
- private:
- std::chrono::milliseconds time_{0};
-};
+ [[nodiscard]] std::chrono::time_point<std::chrono::steady_clock> now() const
override {
+ return std::chrono::steady_clock::time_point{timeSinceEpoch()};
+ }
-class ManualSteadyClock : public timeutils::SteadyClock {
- public:
- std::chrono::milliseconds timeSinceEpoch() const override { return time_; }
void advance(std::chrono::milliseconds elapsed_time) {
if (elapsed_time.count() < 0) {
throw std::logic_error("A steady clock can only be advanced forward");
}
+ std::lock_guard lock(mtx_);
time_ += elapsed_time;
+ for (auto* cv : cvs_) {
+ cv->notify_all();
+ }
}
- std::chrono::steady_clock::time_point now() const override {
- return std::chrono::steady_clock::time_point{time_};
+ bool wait_until(std::condition_variable& cv, std::unique_lock<std::mutex>&
lck, std::chrono::milliseconds time, const std::function<bool()>& pred)
override {
+ std::chrono::milliseconds now;
+ {
+ std::unique_lock lock(mtx_);
+ now = time_;
+ cvs_.insert(&cv);
+ }
+ cv.wait_for(lck, time - now, [&] {
+ now = timeSinceEpoch();
+ return now >= time || pred();
+ });
+ {
+ std::unique_lock lock(mtx_);
+ cvs_.erase(&cv);
+ }
+ return pred();
}
private:
+ mutable std::mutex mtx_;
+ std::unordered_set<std::condition_variable*> cvs_;
std::chrono::milliseconds time_{0};
};
-
#ifdef WIN32
// The tzdata location is set as a global variable in date-tz library
// We need to set it from from libminifi to effect calls made from libminifi
(on Windows)
diff --git a/libminifi/include/utils/TimeUtil.h
b/libminifi/include/utils/TimeUtil.h
index 906d851fe..9e3038a6c 100644
--- a/libminifi/include/utils/TimeUtil.h
+++ b/libminifi/include/utils/TimeUtil.h
@@ -29,6 +29,8 @@
#include <optional>
#include <functional>
#include <algorithm>
+#include <condition_variable>
+#include <memory>
// libc++ doesn't define operator<=> on durations, and apparently the operator
rewrite rules don't automagically make one
#if defined(_LIBCPP_VERSION) && _LIBCPP_VERSION <= 14000
@@ -70,6 +72,9 @@ class Clock {
public:
virtual ~Clock() = default;
virtual std::chrono::milliseconds timeSinceEpoch() const = 0;
+ virtual bool wait_until(std::condition_variable& cv,
std::unique_lock<std::mutex>& lck, std::chrono::milliseconds time, const
std::function<bool()>& pred) {
+ return cv.wait_for(lck, time - timeSinceEpoch(), pred);
+ }
};
class SystemClock : public Clock {
@@ -90,6 +95,11 @@ class SteadyClock : public Clock {
}
};
+std::shared_ptr<SteadyClock> getClock();
+
+// test-only utility to specify what clock to use
+void setClock(std::shared_ptr<SteadyClock> clock);
+
inline std::string getTimeStr(std::chrono::system_clock::time_point tp) {
std::ostringstream stream;
date::to_stream(stream, TIME_FORMAT,
std::chrono::floor<std::chrono::milliseconds>(tp));
diff --git a/libminifi/include/utils/file/FileSystem.h
b/libminifi/include/utils/file/FileSystem.h
index a6cafc0cf..cc873accb 100644
--- a/libminifi/include/utils/file/FileSystem.h
+++ b/libminifi/include/utils/file/FileSystem.h
@@ -21,7 +21,7 @@
#include <optional>
#include <string>
#include "utils/crypto/EncryptionProvider.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
namespace org {
namespace apache {
diff --git a/libminifi/include/utils/file/FileUtils.h
b/libminifi/include/utils/file/FileUtils.h
index fa8f02a64..d1b582840 100644
--- a/libminifi/include/utils/file/FileUtils.h
+++ b/libminifi/include/utils/file/FileUtils.h
@@ -70,7 +70,7 @@
#endif
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
#include "utils/StringUtils.h"
#include "utils/file/PathUtils.h"
#include "utils/gsl.h"
diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp
index b471bf202..ccd8fcab9 100644
--- a/libminifi/src/Configuration.cpp
+++ b/libminifi/src/Configuration.cpp
@@ -124,6 +124,14 @@ const std::vector<core::ConfigurationProperty>
Configuration::CONFIGURATION_PROP
core::ConfigurationProperty{Configuration::nifi_log_logger_root},
core::ConfigurationProperty{Configuration::nifi_log_compression_cached_log_max_size,
gsl::make_not_null(core::StandardValidators::get().DATA_SIZE_VALIDATOR.get())},
core::ConfigurationProperty{Configuration::nifi_log_compression_compressed_log_max_size,
gsl::make_not_null(core::StandardValidators::get().DATA_SIZE_VALIDATOR.get())},
+ core::ConfigurationProperty{Configuration::nifi_log_alert_url},
+
core::ConfigurationProperty{Configuration::nifi_log_alert_ssl_context_service},
+ core::ConfigurationProperty{Configuration::nifi_log_alert_batch_size},
+ core::ConfigurationProperty{Configuration::nifi_log_alert_flush_period},
+ core::ConfigurationProperty{Configuration::nifi_log_alert_filter},
+ core::ConfigurationProperty{Configuration::nifi_log_alert_rate_limit},
+ core::ConfigurationProperty{Configuration::nifi_log_alert_buffer_limit},
+ core::ConfigurationProperty{Configuration::nifi_log_alert_level},
core::ConfigurationProperty{Configuration::nifi_asset_directory},
core::ConfigurationProperty{Configuration::nifi_metrics_publisher_class},
core::ConfigurationProperty{Configuration::nifi_metrics_publisher_prometheus_metrics_publisher_port,
gsl::make_not_null(core::StandardValidators::get().PORT_VALIDATOR.get())},
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 2339e968c..79923a69f 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -376,6 +376,7 @@ int16_t FlowController::start() {
this->root_->startProcessing(timer_scheduler_, event_scheduler_,
cron_scheduler_);
}
C2Client::initialize(this, this, this);
+
core::logging::LoggerConfiguration::getConfiguration().initializeAlertSinks(this,
configuration_);
running_ = true;
this->protocol_->start();
this->provenance_repo_->start();
diff --git a/libminifi/src/core/logging/LoggerConfiguration.cpp
b/libminifi/src/core/logging/LoggerConfiguration.cpp
index 10a53a036..770fcf6f7 100644
--- a/libminifi/src/core/logging/LoggerConfiguration.cpp
+++ b/libminifi/src/core/logging/LoggerConfiguration.cpp
@@ -36,8 +36,10 @@
#include "utils/file/FileUtils.h"
#include "utils/Environment.h"
#include "core/logging/internal/LogCompressorSink.h"
+#include "core/logging/alert/AlertSink.h"
#include "utils/Literals.h"
#include "core/TypedValues.h"
+#include "core/logging/Utils.h"
#include "spdlog/spdlog.h"
#include "spdlog/sinks/stdout_sinks.h"
@@ -60,26 +62,21 @@ namespace org::apache::nifi::minifi::core::logging {
const char* LoggerConfiguration::spdlog_default_pattern = "[%Y-%m-%d
%H:%M:%S.%e] [%n] [%l] %v";
-namespace {
-std::optional<spdlog::level::level_enum> parse_log_level(const std::string&
level_name) {
- if (utils::StringUtils::equalsIgnoreCase(level_name, "trace")) {
- return spdlog::level::trace;
- } else if (utils::StringUtils::equalsIgnoreCase(level_name, "debug")) {
- return spdlog::level::debug;
- } else if (utils::StringUtils::equalsIgnoreCase(level_name, "info")) {
- return spdlog::level::info;
- } else if (utils::StringUtils::equalsIgnoreCase(level_name, "warn")) {
- return spdlog::level::warn;
- } else if (utils::StringUtils::equalsIgnoreCase(level_name, "error")) {
- return spdlog::level::err;
- } else if (utils::StringUtils::equalsIgnoreCase(level_name, "critical")) {
- return spdlog::level::critical;
- } else if (utils::StringUtils::equalsIgnoreCase(level_name, "off")) {
- return spdlog::level::off;
+namespace internal {
+
+void LoggerNamespace::forEachSink(const std::function<void(const
std::shared_ptr<spdlog::sinks::sink>&)>& op) const {
+ for (auto& sink : sinks) {
+ op(sink);
+ }
+ for (auto& sink : exported_sinks) {
+ op(sink);
+ }
+ for (auto& [name, child] : children) {
+ child->forEachSink(op);
}
- return std::nullopt;
}
-} // namespace
+
+} // namespace internal
std::vector<std::string> LoggerProperties::get_keys_of_type(const std::string
&type) {
std::vector<std::string> appenders;
@@ -108,7 +105,13 @@ LoggerConfiguration&
LoggerConfiguration::getConfiguration() {
void LoggerConfiguration::initialize(const std::shared_ptr<LoggerProperties>
&logger_properties) {
std::lock_guard<std::mutex> lock(mutex);
- root_namespace_ = initialize_namespaces(logger_properties);
+ root_namespace_ = initialize_namespaces(logger_properties, logger_);
+ alert_sinks_.clear();
+ root_namespace_->forEachSink([&] (const
std::shared_ptr<spdlog::sinks::sink>& sink) {
+ if (auto alert_sink = std::dynamic_pointer_cast<AlertSink>(sink)) {
+ alert_sinks_.insert(std::move(alert_sink));
+ }
+ });
initializeCompression(lock, logger_properties);
std::string spdlog_pattern;
if (!logger_properties->getString("spdlog.pattern", spdlog_pattern)) {
@@ -163,7 +166,7 @@ std::shared_ptr<spdlog::logger>
LoggerConfiguration::getSpdlogLogger(const std::
return spdlog::get(name);
}
-std::shared_ptr<internal::LoggerNamespace>
LoggerConfiguration::initialize_namespaces(const
std::shared_ptr<LoggerProperties> &logger_properties) {
+std::shared_ptr<internal::LoggerNamespace>
LoggerConfiguration::initialize_namespaces(const
std::shared_ptr<LoggerProperties> &logger_properties, const
std::shared_ptr<Logger> &logger) {
std::map<std::string, std::shared_ptr<spdlog::sinks::sink>> sink_map =
logger_properties->initial_sinks();
std::string appender_type = "appender";
@@ -185,6 +188,10 @@ std::shared_ptr<internal::LoggerNamespace>
LoggerConfiguration::initialize_names
sink_map[appender_name] =
std::make_shared<spdlog::sinks::stderr_sink_mt>();
} else if ("syslog" == appender_type) {
sink_map[appender_name] = LoggerConfiguration::create_syslog_sink();
+ } else if ("alert" == appender_type) {
+ if (auto sink = AlertSink::create(appender_key, logger_properties,
logger)) {
+ sink_map[appender_name] = sink;
+ }
} else {
sink_map[appender_name] = LoggerConfiguration::create_fallback_sink();
}
@@ -204,12 +211,16 @@ std::shared_ptr<internal::LoggerNamespace>
LoggerConfiguration::initialize_names
std::string level_name = utils::StringUtils::trim(segment);
if (first) {
first = false;
- auto opt_level = parse_log_level(level_name);
+ auto opt_level = utils::parse_log_level(level_name);
if (opt_level) {
level = *opt_level;
}
} else {
- sinks.push_back(sink_map[level_name]);
+ if (auto it = sink_map.find(level_name); it != sink_map.end()) {
+ sinks.push_back(it->second);
+ } else {
+ logger->log_error("Couldn't find sink '%s'", level_name);
+ }
}
}
std::shared_ptr<internal::LoggerNamespace> current_namespace =
root_namespace;
@@ -233,8 +244,8 @@ std::shared_ptr<internal::LoggerNamespace>
LoggerConfiguration::initialize_names
return root_namespace;
}
-std::shared_ptr<spdlog::logger>
LoggerConfiguration::get_logger(std::shared_ptr<Logger> logger, const
std::shared_ptr<internal::LoggerNamespace> &root_namespace, const std::string
&name,
-
std::shared_ptr<spdlog::formatter> formatter, bool remove_if_present) {
+std::shared_ptr<spdlog::logger> LoggerConfiguration::get_logger(const
std::shared_ptr<Logger> &logger, const
std::shared_ptr<internal::LoggerNamespace> &root_namespace, const std::string
&name,
+ const
std::shared_ptr<spdlog::formatter> &formatter, bool remove_if_present) {
std::shared_ptr<spdlog::logger> spdlogger = spdlog::get(name);
if (spdlogger) {
if (remove_if_present) {
@@ -316,6 +327,13 @@ void LoggerConfiguration::initializeCompression(const
std::lock_guard<std::mutex
}
}
+void
LoggerConfiguration::initializeAlertSinks(core::controller::ControllerServiceProvider*
controller, const std::shared_ptr<AgentIdentificationProvider>& agent_id) {
+ std::lock_guard guard(mutex);
+ for (auto& sink : alert_sinks_) {
+ sink->initialize(controller, agent_id);
+ }
+}
+
std::shared_ptr<spdlog::sinks::rotating_file_sink_mt>
LoggerConfiguration::getRotatingFileSink(const std::string& appender_key, const
std::shared_ptr<LoggerProperties>& properties) {
// According to spdlog docs, if two loggers write to the same file, they
must use the same sink object.
// Note that some logging configuration changes will not take effect until
MiNiFi is restarted.
diff --git a/libminifi/src/core/logging/LoggerFactory.cpp
b/libminifi/src/core/logging/LoggerFactory.cpp
new file mode 100644
index 000000000..b7b7f591b
--- /dev/null
+++ b/libminifi/src/core/logging/LoggerFactory.cpp
@@ -0,0 +1,28 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/LoggerFactory.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org::apache::nifi::minifi::core::logging {
+
+std::shared_ptr<Logger> LoggerFactoryBase::getAliasedLogger(const std::string
&alias) {
+ return LoggerConfiguration::getConfiguration().getLogger(alias);
+}
+
+} // namespace org::apache::nifi::minifi::core::logging
diff --git a/libminifi/src/core/logging/Utils.cpp
b/libminifi/src/core/logging/Utils.cpp
new file mode 100644
index 000000000..68dc2c671
--- /dev/null
+++ b/libminifi/src/core/logging/Utils.cpp
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/Utils.h"
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+std::optional<spdlog::level::level_enum> parse_log_level(const std::string&
level_name) {
+ if (utils::StringUtils::equalsIgnoreCase(level_name, "trace")) {
+ return spdlog::level::trace;
+ } else if (utils::StringUtils::equalsIgnoreCase(level_name, "debug")) {
+ return spdlog::level::debug;
+ } else if (utils::StringUtils::equalsIgnoreCase(level_name, "info")) {
+ return spdlog::level::info;
+ } else if (utils::StringUtils::equalsIgnoreCase(level_name, "warn")) {
+ return spdlog::level::warn;
+ } else if (utils::StringUtils::equalsIgnoreCase(level_name, "error")) {
+ return spdlog::level::err;
+ } else if (utils::StringUtils::equalsIgnoreCase(level_name, "critical")) {
+ return spdlog::level::critical;
+ } else if (utils::StringUtils::equalsIgnoreCase(level_name, "off")) {
+ return spdlog::level::off;
+ }
+ return std::nullopt;
+}
+
+} // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/src/core/logging/alert/AlertSink.cpp
b/libminifi/src/core/logging/alert/AlertSink.cpp
new file mode 100644
index 000000000..726c8b995
--- /dev/null
+++ b/libminifi/src/core/logging/alert/AlertSink.cpp
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/alert/AlertSink.h"
+#include "core/TypedValues.h"
+#include "core/ClassLoader.h"
+#include "utils/HTTPClient.h"
+#include "utils/Hash.h"
+#include "core/logging/Utils.h"
+#include "controllers/SSLContextService.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::core::logging {
+
+AlertSink::AlertSink(Config config, std::shared_ptr<Logger> logger)
+ : config_(std::move(config)),
+ live_logs_(config_.rate_limit),
+ buffer_(config_.buffer_limit, config_.batch_size),
+ logger_(std::move(logger)) {
+ set_level(config_.level);
+ next_flush_ = clock_->timeSinceEpoch() + config_.flush_period;
+ flush_thread_ = std::thread([this] {run();});
+}
+
+std::shared_ptr<AlertSink> AlertSink::create(const std::string&
prop_name_prefix, const std::shared_ptr<LoggerProperties>& logger_properties,
std::shared_ptr<Logger> logger) {
+ Config config;
+
+ if (auto url = logger_properties->getString(prop_name_prefix + ".url")) {
+ config.url = url.value();
+ } else {
+ logger->log_info("Missing '%s.url' value, network logging won't be
available", prop_name_prefix);
+ return {};
+ }
+
+ if (auto filter_str = logger_properties->getString(prop_name_prefix +
".filter")) {
+ try {
+ config.filter = utils::Regex{filter_str.value()};
+ } catch (const std::regex_error& err) {
+ logger->log_error("Invalid '%s.filter' value, network logging won't be
available: %s", prop_name_prefix, err.what());
+ return {};
+ }
+ } else {
+ logger->log_error("Missing '%s.filter' value, network logging won't be
available", prop_name_prefix);
+ return {};
+ }
+
+ auto readPropertyOr = [&] (auto suffix, auto parser, auto fallback) {
+ if (auto prop_str = logger_properties->getString(prop_name_prefix +
suffix)) {
+ if (auto prop_val = parser(prop_str.value())) {
+ return prop_val.value();
+ }
+ logger->log_error("Invalid '%s' value, using default '%s'",
prop_name_prefix + suffix, fallback);
+ } else {
+ logger->log_info("Missing '%s' value, using default '%s'",
prop_name_prefix + suffix, fallback);
+ }
+ return parser(fallback).value();
+ };
+
+ auto datasize_parser = [] (const std::string& str) -> std::optional<int> {
+ int val;
+ if (DataSizeValue::StringToInt(str, val)) {
+ return val;
+ }
+ return {};
+ };
+
+ config.batch_size = readPropertyOr(".batch.size", datasize_parser, "100 KB");
+ config.flush_period = readPropertyOr(".flush.period",
TimePeriodValue::fromString, "5 s").getMilliseconds();
+ config.rate_limit = readPropertyOr(".rate.limit",
TimePeriodValue::fromString, "10 min").getMilliseconds();
+ config.buffer_limit = readPropertyOr(".buffer.limit", datasize_parser, "1
MB");
+ config.level = readPropertyOr(".level", utils::parse_log_level, "trace");
+ config.ssl_service_name = logger_properties->getString(prop_name_prefix +
".ssl.context.service");
+
+ return std::shared_ptr<AlertSink>(new AlertSink(std::move(config),
std::move(logger)));
+}
+
+void AlertSink::initialize(core::controller::ControllerServiceProvider*
controller, std::shared_ptr<AgentIdentificationProvider> agent_id) {
+ auto services = std::make_unique<Services>();
+
+ services->agent_id = std::move(agent_id);
+
+ if (config_.ssl_service_name) {
+ if (!controller) {
+ logger_->log_error("Could not find service '%s': no service provider",
config_.ssl_service_name.value());
+ return;
+ }
+ if (auto service =
controller->getControllerService(config_.ssl_service_name.value())) {
+ if (auto ssl_service =
std::dynamic_pointer_cast<controllers::SSLContextService>(service)) {
+ services->ssl_service = ssl_service;
+ } else {
+ logger_->log_error("Service '%s' is not an SSLContextService",
config_.ssl_service_name.value());
+ return;
+ }
+ } else {
+ logger_->log_error("Could not find service '%s'",
config_.ssl_service_name.value());
+ return;
+ }
+ }
+
+ services.reset(services_.exchange(services.release()));
+}
+
+void AlertSink::sink_it_(const spdlog::details::log_msg& msg) {
+ // this method is protected upstream in base_sink by a mutex
+
+ // TODO(adebreceni): revisit this after MINIFICPP-1903
+ std::string payload(msg.payload.data(), msg.payload.size());
+ utils::SMatch match;
+ if (!utils::regexMatch(payload, match, config_.filter)) {
+ return;
+ }
+ size_t hash = 0;
+ for (size_t idx = 1; idx < match.size(); ++idx) {
+ std::string submatch = match[idx].str();
+ hash = utils::hash_combine(hash, std::hash<std::string>{}(submatch));
+ }
+ if (!live_logs_.tryAdd(clock_->timeSinceEpoch(), hash)) {
+ return;
+ }
+
+ spdlog::memory_buf_t formatted;
+ formatter_->format(msg, formatted);
+
+ buffer_.modify([&] (LogBuffer& log_buf) {
+ log_buf.size_ += formatted.size();
+ log_buf.data_.emplace_back(std::string{formatted.data(),
formatted.size()}, hash);
+ });
+}
+
+void AlertSink::flush_() {}
+
+void AlertSink::run() {
+ while (running_) {
+ {
+ std::unique_lock lock(mtx_);
+ if (clock_->wait_until(cv_, lock, next_flush_, [&] {return !running_;}))
{
+ break;
+ }
+ next_flush_ = clock_->timeSinceEpoch() + config_.flush_period;
+ }
+ std::unique_ptr<Services> services(services_.exchange(nullptr));
+ if (!services || !running_) {
+ continue;
+ }
+ try {
+ send(*services);
+ } catch (const std::exception& err) {
+ logger_->log_error("Exception while sending logs: %s", err.what());
+ } catch (...) {
+ logger_->log_error("Unknown exception while sending logs");
+ }
+ Services* expected{nullptr};
+ // only restore the services pointer if no initialize set it to something
else meanwhile
+ if (services_.compare_exchange_strong(expected, services.get())) {
+ (void)services.release();
+ }
+ }
+}
+
+AlertSink::~AlertSink() {
+ {
+ std::lock_guard lock(mtx_);
+ running_ = false;
+ cv_.notify_all();
+ }
+ if (flush_thread_.joinable()) {
+ flush_thread_.join();
+ }
+ delete services_.exchange(nullptr);
+}
+
+void AlertSink::send(Services& services) {
+ LogBuffer logs;
+ buffer_.commit();
+ if (!buffer_.tryDequeue(logs)) {
+ return;
+ }
+
+ auto client =
core::ClassLoader::getDefaultClassLoader().instantiate<utils::BaseHTTPClient>("HTTPClient",
"HTTPClient");
+ if (!client) {
+ logger_->log_error("Could not instantiate a HTTPClient object");
+ return;
+ }
+ client->initialize("PUT", config_.url, services.ssl_service);
+
+ rapidjson::Document doc(rapidjson::kObjectType);
+ std::string agent_id = services.agent_id->getAgentIdentifier();
+ doc.AddMember("agentId", rapidjson::Value(agent_id.data(),
agent_id.length()), doc.GetAllocator());
+ doc.AddMember("alerts", rapidjson::Value(rapidjson::kArrayType),
doc.GetAllocator());
+ for (const auto& [log, _] : logs.data_) {
+ doc["alerts"].PushBack(rapidjson::Value(log.data(), log.size()),
doc.GetAllocator());
+ }
+ rapidjson::StringBuffer buffer;
+ rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+ doc.Accept(writer);
+
+ auto data_input = std::make_unique<utils::ByteInputCallback>();
+ auto data_cb = std::make_unique<utils::HTTPUploadCallback>();
+ data_input->write(std::string(buffer.GetString(), buffer.GetSize()));
+ data_cb->ptr = data_input.get();
+ client->setUploadCallback(data_cb.get());
+ client->setContentType("application/json");
+
+ bool req_success = client->submit();
+
+ int64_t resp_code = client->getResponseCode();
+ const bool response_success = 200 <= resp_code && resp_code < 300;
+ const bool client_err = 400 <= resp_code && resp_code < 500;
+ const bool server_err = 500 <= resp_code && resp_code < 600;
+ if (client_err || server_err) {
+ logger_->log_error("Error response code '" "%" PRId64 "' from '%s'",
resp_code, config_.url);
+ } else if (!response_success) {
+ logger_->log_warn("Non-success response code '" "%" PRId64 "' from '%s'",
resp_code, config_.url);
+ } else {
+ logger_->log_debug("Response code '" "%" PRId64 "' from '%s'", resp_code,
config_.url);
+ }
+
+ if (!req_success) {
+ logger_->log_error("Failed to send alert request");
+ }
+}
+
+AlertSink::LogBuffer AlertSink::LogBuffer::allocate(size_t /*size*/) {
+ return {};
+}
+
+AlertSink::LogBuffer AlertSink::LogBuffer::commit() {
+ return std::move(*this);
+}
+
+size_t AlertSink::LogBuffer::size() const {
+ return size_;
+}
+
+bool AlertSink::LiveLogSet::tryAdd(std::chrono::milliseconds now, size_t hash)
{
+ auto limit = now - lifetime_;
+ while (!timestamped_hashes_.empty() && timestamped_hashes_.front().first <
limit) {
+ hashes_to_ignore_.erase(timestamped_hashes_.front().second);
+ timestamped_hashes_.pop_front();
+ }
+
+ if (!hashes_to_ignore_.insert(hash).second) {
+ return false;
+ }
+
+ timestamped_hashes_.emplace_back(now, hash);
+ return true;
+}
+
+} // namespace org::apache::nifi::minifi::core::logging
diff --git a/libminifi/src/properties/Properties.cpp
b/libminifi/src/properties/Properties.cpp
index b8d127a88..55a930c02 100644
--- a/libminifi/src/properties/Properties.cpp
+++ b/libminifi/src/properties/Properties.cpp
@@ -25,16 +25,11 @@
#include "core/logging/LoggerConfiguration.h"
#include "properties/PropertiesFile.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
-#define TRACE_BUFFER_SIZE 512
-
-Properties::Properties(const std::string& name)
+Properties::Properties(std::string name)
: logger_(core::logging::LoggerFactory<Properties>::getLogger()),
- name_(name) {
+ name_(std::move(name)) {
}
// Get the config value
@@ -158,7 +153,4 @@ std::map<std::string, std::string>
Properties::getProperties() const {
return properties;
}
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+} // namespace org::apache::nifi::minifi
diff --git a/libminifi/src/utils/Id.cpp b/libminifi/src/utils/Id.cpp
index a62c98a9f..7d283a888 100644
--- a/libminifi/src/utils/Id.cpp
+++ b/libminifi/src/utils/Id.cpp
@@ -32,7 +32,7 @@
#include <memory>
#include <string>
#include <limits>
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/LoggerFactory.h"
#ifdef WIN32
#include "Rpc.h"
diff --git a/libminifi/src/utils/TimeUtil.cpp b/libminifi/src/utils/TimeUtil.cpp
new file mode 100644
index 000000000..04df8d6fb
--- /dev/null
+++ b/libminifi/src/utils/TimeUtil.cpp
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/TimeUtil.h"
+
+namespace org::apache::nifi::minifi::utils::timeutils {
+
+static std::mutex global_clock_mtx;
+static std::shared_ptr<SteadyClock>
global_clock{std::make_shared<SteadyClock>()};
+
+std::shared_ptr<SteadyClock> getClock() {
+ std::lock_guard lock(global_clock_mtx);
+ return global_clock;
+}
+
+// test-only utility to specify what clock to use
+void setClock(std::shared_ptr<SteadyClock> clock) {
+ std::lock_guard lock(global_clock_mtx);
+ global_clock = std::move(clock);
+}
+
+} // namespace org::apache::nifi::minifi::utils::timeutils
diff --git a/libminifi/test/unit/SwapTestController.h
b/libminifi/test/unit/SwapTestController.h
index 0a921a754..f6650d2ee 100644
--- a/libminifi/test/unit/SwapTestController.h
+++ b/libminifi/test/unit/SwapTestController.h
@@ -166,10 +166,8 @@ struct VerifiedQueue {
return result;
}
- VerifiedQueue(std::shared_ptr<minifi::SwapManager> swap_manager,
std::unique_ptr<utils::timeutils::SteadyClock> clock)
- : impl(std::move(swap_manager)) {
- FlowFileQueueTestAccessor::get_clock_(impl) = std::move(clock);
- }
+ explicit VerifiedQueue(std::shared_ptr<minifi::SwapManager> swap_manager)
+ : impl(std::move(swap_manager)) {}
utils::FlowFileQueue impl;
FlowFilePtrVec ref_;
@@ -181,9 +179,9 @@ class SwapTestController : public TestController {
content_repo_ =
std::make_shared<core::repository::VolatileContentRepository>();
flow_repo_ = std::make_shared<SwappingFlowFileTestRepo>();
flow_repo_->loadComponent(content_repo_);
- auto clock = std::make_unique<utils::ManualSteadyClock>();
- clock_ = clock.get();
- queue_ =
std::make_shared<VerifiedQueue>(std::static_pointer_cast<minifi::SwapManager>(flow_repo_),
std::move(clock));
+ clock_ = std::make_shared<utils::ManualClock>();
+ utils::timeutils::setClock(clock_);
+ queue_ =
std::make_shared<VerifiedQueue>(std::static_pointer_cast<minifi::SwapManager>(flow_repo_));
}
void setLimits(size_t min_size, size_t target_size, size_t max_size) {
@@ -235,6 +233,5 @@ class SwapTestController : public TestController {
std::shared_ptr<SwappingFlowFileTestRepo> flow_repo_;
std::shared_ptr<core::repository::VolatileContentRepository> content_repo_;
std::shared_ptr<VerifiedQueue> queue_;
- // owned by the queue_
- utils::ManualSteadyClock* clock_;
+ std::shared_ptr<utils::ManualClock> clock_;
};