This is an automated email from the ASF dual-hosted git repository. fgerlits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 80c2e7a486b2337d8911431641124bbd0d041827 Author: Gabor Gyimesi <[email protected]> AuthorDate: Mon Jul 11 16:00:36 2022 +0200 MINIFICPP-1866 Secure connection for ListenSyslog, ListenTCP Signed-off-by: Ferenc Gerlits <[email protected]> This closes #1370 --- PROCESSORS.md | 41 +++++++--- extensions/civetweb/tests/ListenHTTPTests.cpp | 46 +++++------ extensions/http-curl/tests/C2MetricsTest.cpp | 8 +- .../http-curl/tests/unit/InvokeHTTPTests.cpp | 6 +- extensions/librdkafka/KafkaProcessorBase.cpp | 23 +----- extensions/librdkafka/KafkaProcessorBase.h | 3 +- extensions/librdkafka/PublishKafka.cpp | 4 +- extensions/librdkafka/PublishKafka.h | 2 +- extensions/librdkafka/rdkafka_utils.h | 8 +- extensions/script/tests/LuaScriptEngineTests.cpp | 17 ++-- .../script/tests/PythonScriptEngineTests.cpp | 14 ++-- .../processors/ListenSyslog.cpp | 43 +++++++++- .../standard-processors/processors/ListenSyslog.h | 13 +++- .../standard-processors/processors/ListenTCP.cpp | 37 ++++++++- .../standard-processors/processors/ListenTCP.h | 14 +++- .../processors/NetworkListenerProcessor.cpp | 58 +++++++++----- .../processors/NetworkListenerProcessor.h | 17 +++- .../standard-processors/tests/CMakeLists.txt | 10 ++- .../tests/unit/ListenSyslogTests.cpp | 80 +++++++++++++------ .../tests/unit/ListenTcpTests.cpp | 87 +++++++++++++++++++-- .../tests/unit/resources/ca_cert.crt | 20 +++++ .../tests/unit/resources/cert_and_private_key.pem | 46 +++++++++++ .../tests/CWELCustomProviderTests.cpp | 8 +- .../tests/ConsumeWindowsEventLogTests.cpp | 12 ++- libminifi/include/Connection.h | 8 +- libminifi/include/utils/FlowFileQueue.h | 16 ++-- libminifi/include/utils/net/Server.h | 8 +- .../include/utils/net/SessionHandlingServer.h | 67 ++++++++++++++++ libminifi/include/utils/net/Ssl.h | 42 ++++++++++ .../include/utils/net/{TcpServer.h => SslServer.h} | 47 +++++------ libminifi/include/utils/net/TcpServer.h | 21 ++--- libminifi/src/utils/net/Ssl.cpp | 52 +++++++++++++ libminifi/src/utils/net/SslServer.cpp | 91 ++++++++++++++++++++++ libminifi/src/utils/net/TcpServer.cpp | 23 +----- libminifi/test/Utils.h | 48 +++++++++++- libminifi/test/rocksdb-tests/SwapTests.cpp | 14 ++-- libminifi/test/sql-tests/ExecuteSQLTests.cpp | 14 ++-- .../test/sql-tests/QueryDatabaseTableTests.cpp | 14 ++-- libminifi/test/unit/FlowFileQueueSwapTests.cpp | 10 ++- libminifi/test/unit/SwapTestController.h | 21 ++--- 40 files changed, 856 insertions(+), 257 deletions(-) diff --git a/PROCESSORS.md b/PROCESSORS.md index f06cf7165..8e7e6e328 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -1263,13 +1263,15 @@ With parsing disabled all message will be routed to the success relationship, bu In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. -| Name | Default Value | Allowable Values | Description | -|---------------------------|---------------|------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| Listening Port | 514 | | The port for Syslog communication. (Well-known ports (0-1023) require root access) | -| Protocol | UDP | UDP<br>TCP<br> | The protocol for Syslog communication. | -| Parse Messages | false | false<br>true | Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only contain the sender, protocol, and port, and no additional attributes. | -| Max Batch Size | 500 | | The maximum number of Syslog events to process at a time. | -| Max Size of Message Queue | 10000 | | Maximum number of Syslog messages allowed to be buffered before processing them when the processor is triggered. If the buffer is full, the message is ignored. If set to zero the buffer is unlimited. | +| Name | Default Value | Allowable Values | Description | +|---------------------------|---------------|----------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| Listening Port | 514 | | The port for Syslog communication. (Well-known ports (0-1023) require root access) | +| Protocol | UDP | UDP<br>TCP<br> | The protocol for Syslog communication. | +| Parse Messages | false | false<br>true | Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only contain the sender, protocol, and port, and no additional attributes. | +| Max Batch Size | 500 | | The maximum number of Syslog events to process at a time. | +| Max Size of Message Queue | 10000 | | Maximum number of Syslog messages allowed to be buffered before processing them when the processor is triggered. If the buffer is full, the message is ignored. If set to zero the buffer is unlimited. | +| SSL Context Service | | | The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be received over a secure connection. This Property is only considered if the \<Protocol\> Property has a value of "TCP". | +| Client Auth | NONE | NONE<br/>WANT<br/>REQUIRED | The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided. | ### Relationships @@ -1312,11 +1314,26 @@ Listens for incoming TCP connections and reads data from each connection using a In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. -| Name | Default Value | Allowable Values | Description | -|-------------------------------|---------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| **Listening Port** | | | The port to listen on for communication. | -| **Max Batch Size** | 500 | | The maximum number of messages to process at a time. | -| **Max Size of Message Queue** | 10000 | | Maximum number of messages allowed to be buffered before processing them when the processor is triggered. If the buffer is full, the message is ignored. If set to zero the buffer is unlimited. | +| Name | Default Value | Allowable Values | Description | +|-------------------------------|---------------|----------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| **Listening Port** | | | The port to listen on for communication. | +| **Max Batch Size** | 500 | | The maximum number of messages to process at a time. | +| **Max Size of Message Queue** | 10000 | | Maximum number of messages allowed to be buffered before processing them when the processor is triggered. If the buffer is full, the message is ignored. If set to zero the buffer is unlimited. | +| SSL Context Service | | | The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be received over a secure connection. | +| Client Auth | NONE | NONE<br/>WANT<br/>REQUIRED | The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided. | + +### Relationships + +| Name | Description | +|---------|--------------------------------------------------------------------| +| success | Messages received successfully will be sent out this relationship. | + +### Output Attributes + +| Attribute | Description | Requirements | +|--------------------------|-----------------------------------------------|------------------------| +| _tcp.port_ | The sending port the messages were received. | - | +| _tcp.sender_ | The sending host of the messages. | - | ## ListFile diff --git a/extensions/civetweb/tests/ListenHTTPTests.cpp b/extensions/civetweb/tests/ListenHTTPTests.cpp index 3b577345a..e9d326e70 100644 --- a/extensions/civetweb/tests/ListenHTTPTests.cpp +++ b/extensions/civetweb/tests/ListenHTTPTests.cpp @@ -70,7 +70,7 @@ class ListenHTTPTestsFixture { REQUIRE(!tmp_dir.empty()); // Define test input file - std::string test_input_file = utils::file::FileUtils::concat_path(tmp_dir, "test"); + std::string test_input_file = minifi::utils::file::FileUtils::concat_path(tmp_dir, "test"); { std::ofstream os(test_input_file); os << "Hello response body"; @@ -116,11 +116,11 @@ class ListenHTTPTestsFixture { void create_ssl_context_service(const char* ca, const char* client_cert) { auto config = std::make_shared<minifi::Configure>(); if (ca != nullptr) { - config->set(minifi::Configure::nifi_security_client_ca_certificate, utils::file::FileUtils::get_executable_dir() + "/resources/" + ca); + config->set(minifi::Configure::nifi_security_client_ca_certificate, minifi::utils::file::FileUtils::get_executable_dir() + "/resources/" + ca); } if (client_cert != nullptr) { - config->set(minifi::Configure::nifi_security_client_certificate, utils::file::FileUtils::get_executable_dir() + "/resources/" + client_cert); - config->set(minifi::Configure::nifi_security_client_private_key, utils::file::FileUtils::get_executable_dir() + "/resources/" + client_cert); + config->set(minifi::Configure::nifi_security_client_certificate, minifi::utils::file::FileUtils::get_executable_dir() + "/resources/" + client_cert); + config->set(minifi::Configure::nifi_security_client_private_key, minifi::utils::file::FileUtils::get_executable_dir() + "/resources/" + client_cert); config->set(minifi::Configure::nifi_security_client_pass_phrase, "Password12"); } ssl_context_service = std::make_shared<minifi::controllers::SSLContextService>("SSLContextService", config); @@ -165,10 +165,10 @@ class ListenHTTPTestsFixture { if (!update_attribute->getDynamicProperty("mime.type", content_type)) { content_type = "application/octet-stream"; } - REQUIRE(content_type == utils::StringUtils::trim(client->getResponseHeaderMap().at("Content-type"))); - REQUIRE("19" == utils::StringUtils::trim(client->getResponseHeaderMap().at("Content-length"))); + REQUIRE(content_type == minifi::utils::StringUtils::trim(client->getResponseHeaderMap().at("Content-type"))); + REQUIRE("19" == minifi::utils::StringUtils::trim(client->getResponseHeaderMap().at("Content-length"))); } else { - REQUIRE("0" == utils::StringUtils::trim(client->getResponseHeaderMap().at("Content-length"))); + REQUIRE("0" == minifi::utils::StringUtils::trim(client->getResponseHeaderMap().at("Content-length"))); } } @@ -425,7 +425,7 @@ TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTP Batch tests", "[batch]") { #ifdef OPENSSL_SUPPORT TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS without CA", "[basic][https]") { - plan->setProperty(listen_http, "SSL Certificate", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/server.pem")); + plan->setProperty(listen_http, "SSL Certificate", minifi::utils::file::FileUtils::concat_path(minifi::utils::file::FileUtils::get_executable_dir(), "resources/server.pem")); create_ssl_context_service("goodCA.crt", nullptr); @@ -446,8 +446,8 @@ TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS without CA", "[basic][https]") { } TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS without client cert", "[basic][https]") { - plan->setProperty(listen_http, "SSL Certificate", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/server.pem")); - plan->setProperty(listen_http, "SSL Certificate Authority", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/goodCA.crt")); + plan->setProperty(listen_http, "SSL Certificate", minifi::utils::file::FileUtils::concat_path(minifi::utils::file::FileUtils::get_executable_dir(), "resources/server.pem")); + plan->setProperty(listen_http, "SSL Certificate Authority", minifi::utils::file::FileUtils::concat_path(minifi::utils::file::FileUtils::get_executable_dir(), "resources/goodCA.crt")); create_ssl_context_service("goodCA.crt", nullptr); @@ -468,8 +468,8 @@ TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS without client cert", "[basic][h } TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS with client cert from good CA", "[https]") { - plan->setProperty(listen_http, "SSL Certificate", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/server.pem")); - plan->setProperty(listen_http, "SSL Certificate Authority", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/goodCA.crt")); + plan->setProperty(listen_http, "SSL Certificate", minifi::utils::file::FileUtils::concat_path(minifi::utils::file::FileUtils::get_executable_dir(), "resources/server.pem")); + plan->setProperty(listen_http, "SSL Certificate Authority", minifi::utils::file::FileUtils::concat_path(minifi::utils::file::FileUtils::get_executable_dir(), "resources/goodCA.crt")); plan->setProperty(listen_http, "SSL Verify Peer", "yes"); create_ssl_context_service("goodCA.crt", "goodCA_goodClient.pem"); @@ -491,8 +491,8 @@ TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS with client cert from good CA", } TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS with PKCS12 client cert from good CA", "[https]") { - plan->setProperty(listen_http, "SSL Certificate", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/server.pem")); - plan->setProperty(listen_http, "SSL Certificate Authority", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/goodCA.crt")); + plan->setProperty(listen_http, "SSL Certificate", minifi::utils::file::FileUtils::concat_path(minifi::utils::file::FileUtils::get_executable_dir(), "resources/server.pem")); + plan->setProperty(listen_http, "SSL Certificate Authority", minifi::utils::file::FileUtils::concat_path(minifi::utils::file::FileUtils::get_executable_dir(), "resources/goodCA.crt")); plan->setProperty(listen_http, "SSL Verify Peer", "yes"); create_ssl_context_service("goodCA.crt", "goodCA_goodClient.p12"); @@ -514,8 +514,8 @@ TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS with PKCS12 client cert from goo } TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS with client cert from bad CA", "[https]") { - plan->setProperty(listen_http, "SSL Certificate", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/server.pem")); - plan->setProperty(listen_http, "SSL Certificate Authority", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/goodCA.crt")); + plan->setProperty(listen_http, "SSL Certificate", minifi::utils::file::FileUtils::concat_path(minifi::utils::file::FileUtils::get_executable_dir(), "resources/server.pem")); + plan->setProperty(listen_http, "SSL Certificate Authority", minifi::utils::file::FileUtils::concat_path(minifi::utils::file::FileUtils::get_executable_dir(), "resources/goodCA.crt")); bool should_succeed = false; int64_t response_code = 0; @@ -560,8 +560,8 @@ TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS with client cert from bad CA", " } TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS with client cert with matching DN", "[https][DN]") { - plan->setProperty(listen_http, "SSL Certificate", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/server.pem")); - plan->setProperty(listen_http, "SSL Certificate Authority", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/goodCA.crt")); + plan->setProperty(listen_http, "SSL Certificate", minifi::utils::file::FileUtils::concat_path(minifi::utils::file::FileUtils::get_executable_dir(), "resources/server.pem")); + plan->setProperty(listen_http, "SSL Certificate Authority", minifi::utils::file::FileUtils::concat_path(minifi::utils::file::FileUtils::get_executable_dir(), "resources/goodCA.crt")); plan->setProperty(listen_http, "Authorized DN Pattern", ".*/CN=good\\..*"); plan->setProperty(listen_http, "SSL Verify Peer", "yes"); @@ -584,8 +584,8 @@ TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS with client cert with matching D } TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS with client cert with non-matching DN", "[https][DN]") { - plan->setProperty(listen_http, "SSL Certificate", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/server.pem")); - plan->setProperty(listen_http, "SSL Certificate Authority", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/goodCA.crt")); + plan->setProperty(listen_http, "SSL Certificate", minifi::utils::file::FileUtils::concat_path(minifi::utils::file::FileUtils::get_executable_dir(), "resources/server.pem")); + plan->setProperty(listen_http, "SSL Certificate Authority", minifi::utils::file::FileUtils::concat_path(minifi::utils::file::FileUtils::get_executable_dir(), "resources/goodCA.crt")); plan->setProperty(listen_http, "Authorized DN Pattern", ".*/CN=good\\..*"); int64_t response_code = 0; @@ -630,8 +630,8 @@ TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS with client cert with non-matchi #if CURL_AT_LEAST_VERSION(7, 54, 0) TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS minimum SSL version", "[https]") { - plan->setProperty(listen_http, "SSL Certificate", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/server.pem")); - plan->setProperty(listen_http, "SSL Certificate Authority", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/goodCA.crt")); + plan->setProperty(listen_http, "SSL Certificate", minifi::utils::file::FileUtils::concat_path(minifi::utils::file::FileUtils::get_executable_dir(), "resources/server.pem")); + plan->setProperty(listen_http, "SSL Certificate Authority", minifi::utils::file::FileUtils::concat_path(minifi::utils::file::FileUtils::get_executable_dir(), "resources/goodCA.crt")); SECTION("GET") { method = "GET"; @@ -654,7 +654,7 @@ TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS minimum SSL version", "[https]") if (method == "POST") { client->setPostFields(payload); } - REQUIRE(client->setSpecificSSLVersion(utils::SSLVersion::TLSv1_1)); + REQUIRE(client->setSpecificSSLVersion(minifi::utils::SSLVersion::TLSv1_1)); test_connect({HttpResponseExpectations{false, 0}}, 0); } diff --git a/extensions/http-curl/tests/C2MetricsTest.cpp b/extensions/http-curl/tests/C2MetricsTest.cpp index 0e06d7478..99fe46e52 100644 --- a/extensions/http-curl/tests/C2MetricsTest.cpp +++ b/extensions/http-curl/tests/C2MetricsTest.cpp @@ -179,8 +179,8 @@ class MetricsHandler: public HeartbeatHandler { [[nodiscard]] static std::string getReplacementConfigAsJsonValue(const std::string& replacement_config_path) { std::ifstream is(replacement_config_path); auto content = std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()); - content = utils::StringUtils::replaceAll(content, "\n", "\\n"); - content = utils::StringUtils::replaceAll(content, "\"", "\\\""); + content = minifi::utils::StringUtils::replaceAll(content, "\n", "\\n"); + content = minifi::utils::StringUtils::replaceAll(content, "\"", "\\\""); return content; } @@ -206,8 +206,8 @@ int main(int argc, char **argv) { harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.classes", "GetTCPMetrics"); harness.setKeyDir(args.key_dir); auto replacement_path = args.test_file; - utils::StringUtils::replaceAll(replacement_path, "TestC2Metrics", "TestC2MetricsUpdate"); - utils::StringUtils::replaceAll(replacement_path, "/", std::string(1, org::apache::nifi::minifi::utils::file::FileUtils::get_separator())); + minifi::utils::StringUtils::replaceAll(replacement_path, "TestC2Metrics", "TestC2MetricsUpdate"); + minifi::utils::StringUtils::replaceAll(replacement_path, "/", std::string(1, org::apache::nifi::minifi::utils::file::FileUtils::get_separator())); org::apache::nifi::minifi::test::MetricsHandler handler(metrics_updated_successfully, harness.getConfiguration(), replacement_path); harness.setUrl(args.url, &handler); harness.run(args.test_file); diff --git a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp index 395b99808..9d04be6c4 100644 --- a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp +++ b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp @@ -84,7 +84,7 @@ TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") { std::shared_ptr<core::Processor> invokehttp = std::make_shared<org::apache::nifi::minifi::processors::InvokeHTTP>("invokehttp"); invokehttp->initialize(); - utils::Identifier invokehttp_uuid = invokehttp->getUUID(); + minifi::utils::Identifier invokehttp_uuid = invokehttp->getUUID(); REQUIRE(invokehttp_uuid); auto node = std::make_shared<core::ProcessorNode>(invokehttp.get()); @@ -139,10 +139,10 @@ TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") { std::shared_ptr<core::Processor> invokehttp = std::make_shared<org::apache::nifi::minifi::processors::InvokeHTTP>("invokehttp"); invokehttp->initialize(); - utils::Identifier processoruuid = listenhttp->getUUID(); + minifi::utils::Identifier processoruuid = listenhttp->getUUID(); REQUIRE(processoruuid); - utils::Identifier invokehttp_uuid = invokehttp->getUUID(); + minifi::utils::Identifier invokehttp_uuid = invokehttp->getUUID(); REQUIRE(invokehttp_uuid); auto configuration = std::make_shared<minifi::Configure>(); diff --git a/extensions/librdkafka/KafkaProcessorBase.cpp b/extensions/librdkafka/KafkaProcessorBase.cpp index 1a3fd142d..7e57b5ce4 100644 --- a/extensions/librdkafka/KafkaProcessorBase.cpp +++ b/extensions/librdkafka/KafkaProcessorBase.cpp @@ -22,27 +22,8 @@ namespace org::apache::nifi::minifi::processors { -std::optional<utils::SSL_data> KafkaProcessorBase::getSslData(core::ProcessContext& context) const { - std::string ssl_service_name; - if (context.getProperty(SSLContextService.getName(), ssl_service_name) && !ssl_service_name.empty()) { - std::shared_ptr<core::controller::ControllerService> service = context.getControllerService(ssl_service_name); - if (service) { - auto ssl_service = std::static_pointer_cast<minifi::controllers::SSLContextService>(service); - utils::SSL_data ssl_data; - ssl_data.ca_loc = ssl_service->getCACertificate(); - ssl_data.cert_loc = ssl_service->getCertificateFile(); - ssl_data.key_loc = ssl_service->getPrivateKeyFile(); - ssl_data.key_pw = ssl_service->getPassphrase(); - return ssl_data; - } else { - logger_->log_warn("SSL Context Service property is set to '%s', but the controller service could not be found.", ssl_service_name); - return std::nullopt; - } - } else if (security_protocol_ == SecurityProtocolOption::SSL || security_protocol_ == SecurityProtocolOption::SASL_SSL) { - logger_->log_warn("Security protocol is set to %s, but no valid SSL Context Service property is set.", security_protocol_.toString()); - } - - return std::nullopt; +std::optional<utils::net::SslData> KafkaProcessorBase::getSslData(core::ProcessContext& context) const { + return utils::net::getSslData(context, SSLContextService, logger_); } void KafkaProcessorBase::setKafkaAuthenticationParameters(core::ProcessContext& context, gsl::not_null<rd_kafka_conf_t*> config) { diff --git a/extensions/librdkafka/KafkaProcessorBase.h b/extensions/librdkafka/KafkaProcessorBase.h index ab4b4b222..013777871 100644 --- a/extensions/librdkafka/KafkaProcessorBase.h +++ b/extensions/librdkafka/KafkaProcessorBase.h @@ -23,6 +23,7 @@ #include "core/Processor.h" #include "rdkafka_utils.h" #include "utils/Enum.h" +#include "utils/net/Ssl.h" namespace org::apache::nifi::minifi::processors { @@ -67,7 +68,7 @@ class KafkaProcessorBase : public core::Processor { } protected: - virtual std::optional<utils::SSL_data> getSslData(core::ProcessContext& context) const; + virtual std::optional<utils::net::SslData> getSslData(core::ProcessContext& context) const; void setKafkaAuthenticationParameters(core::ProcessContext& context, gsl::not_null<rd_kafka_conf_t*> config); SecurityProtocolOption security_protocol_; diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp index 0c4a50a8c..470ed661c 100644 --- a/extensions/librdkafka/PublishKafka.cpp +++ b/extensions/librdkafka/PublishKafka.cpp @@ -632,12 +632,12 @@ bool PublishKafka::createNewTopic(const std::shared_ptr<core::ProcessContext> &c return true; } -std::optional<utils::SSL_data> PublishKafka::getSslData(core::ProcessContext& context) const { +std::optional<utils::net::SslData> PublishKafka::getSslData(core::ProcessContext& context) const { if (auto result = KafkaProcessorBase::getSslData(context); result) { return result; } - utils::SSL_data ssl_data; + utils::net::SslData ssl_data; context.getProperty(SecurityCA.getName(), ssl_data.ca_loc); context.getProperty(SecurityCert.getName(), ssl_data.cert_loc); context.getProperty(SecurityPrivateKey.getName(), ssl_data.key_loc); diff --git a/extensions/librdkafka/PublishKafka.h b/extensions/librdkafka/PublishKafka.h index 7e8f218e7..50a10354e 100644 --- a/extensions/librdkafka/PublishKafka.h +++ b/extensions/librdkafka/PublishKafka.h @@ -138,7 +138,7 @@ class PublishKafka : public KafkaProcessorBase { protected: bool configureNewConnection(const std::shared_ptr<core::ProcessContext> &context); bool createNewTopic(const std::shared_ptr<core::ProcessContext> &context, const std::string& topic_name, const std::shared_ptr<core::FlowFile>& flow_file); - std::optional<utils::SSL_data> getSslData(core::ProcessContext& context) const override; + std::optional<utils::net::SslData> getSslData(core::ProcessContext& context) const override; private: KafkaConnectionKey key_; diff --git a/extensions/librdkafka/rdkafka_utils.h b/extensions/librdkafka/rdkafka_utils.h index 0282af81c..7c2e1bfb6 100644 --- a/extensions/librdkafka/rdkafka_utils.h +++ b/extensions/librdkafka/rdkafka_utils.h @@ -29,6 +29,7 @@ #include "core/logging/LoggerConfiguration.h" #include "utils/gsl.h" #include "rdkafka.h" +#include "utils/net/Ssl.h" namespace org { namespace apache { @@ -36,13 +37,6 @@ namespace nifi { namespace minifi { namespace utils { -struct SSL_data { - std::string ca_loc; - std::string cert_loc; - std::string key_loc; - std::string key_pw; -}; - enum class KafkaEncoding { UTF8, HEX diff --git a/extensions/script/tests/LuaScriptEngineTests.cpp b/extensions/script/tests/LuaScriptEngineTests.cpp index 9bbbf381a..1b6841fcd 100644 --- a/extensions/script/tests/LuaScriptEngineTests.cpp +++ b/extensions/script/tests/LuaScriptEngineTests.cpp @@ -20,21 +20,20 @@ #include "Utils.h" #include "lua/LuaScriptEngine.h" -using ScriptException = org::apache::nifi::minifi::script::ScriptException; -using LuaScriptEngine = org::apache::nifi::minifi::lua::LuaScriptEngine; +namespace org::apache::nifi::minifi::test { TEST_CASE("LuaScriptEngine errors during eval", "[luascriptengineeval]") { - LuaScriptEngine engine; + lua::LuaScriptEngine engine; REQUIRE_NOTHROW(engine.eval("print('foo')")); // The exception message comes from the lua engine REQUIRE_THROWS_MATCHES( engine.eval("shout('foo')"), - ScriptException, - ExceptionSubStringMatcher<ScriptException>({"global 'shout' is not callable (a nil value)", "attempt to call a nil value", "attempt to call global 'shout'"})); + script::ScriptException, + utils::ExceptionSubStringMatcher<script::ScriptException>({"global 'shout' is not callable (a nil value)", "attempt to call a nil value", "attempt to call global 'shout'"})); } TEST_CASE("LuaScriptEngine errors during call", "[luascriptenginecall]") { - LuaScriptEngine engine; + lua::LuaScriptEngine engine; REQUIRE_NOTHROW(engine.eval(R"( function foo() print('foo') @@ -48,6 +47,8 @@ TEST_CASE("LuaScriptEngine errors during call", "[luascriptenginecall]") { // The exception message comes from the lua engine REQUIRE_THROWS_MATCHES( engine.call("bar"), - ScriptException, - ExceptionSubStringMatcher<ScriptException>({"global 'shout' is not callable (a nil value)", "attempt to call a nil value", "attempt to call global 'shout'"})); + script::ScriptException, + utils::ExceptionSubStringMatcher<script::ScriptException>({"global 'shout' is not callable (a nil value)", "attempt to call a nil value", "attempt to call global 'shout'"})); } + +} // namespace org::apache::nifi::minifi::test diff --git a/extensions/script/tests/PythonScriptEngineTests.cpp b/extensions/script/tests/PythonScriptEngineTests.cpp index ee9314730..f36bd2634 100644 --- a/extensions/script/tests/PythonScriptEngineTests.cpp +++ b/extensions/script/tests/PythonScriptEngineTests.cpp @@ -20,18 +20,16 @@ #include "Utils.h" #include "python/PythonScriptEngine.h" -using PythonScriptEngine = org::apache::nifi::minifi::python::PythonScriptEngine; -using ScriptException = org::apache::nifi::minifi::script::ScriptException; - +namespace org::apache::nifi::minifi::test { TEST_CASE("PythonScriptEngine errors during eval", "[pythonscriptengineeval]") { - PythonScriptEngine engine; + python::PythonScriptEngine engine; REQUIRE_NOTHROW(engine.eval("print('foo')")); - REQUIRE_THROWS_MATCHES(engine.eval("shout('foo')"), ScriptException, ExceptionSubStringMatcher<ScriptException>({"name 'shout' is not defined"})); + REQUIRE_THROWS_MATCHES(engine.eval("shout('foo')"), script::ScriptException, utils::ExceptionSubStringMatcher<script::ScriptException>({"name 'shout' is not defined"})); } TEST_CASE("PythonScriptEngine errors during call", "[luascriptenginecall]") { - PythonScriptEngine engine; + python::PythonScriptEngine engine; REQUIRE_NOTHROW(engine.eval(R"( def foo(): print('foo') @@ -41,5 +39,7 @@ TEST_CASE("PythonScriptEngine errors during call", "[luascriptenginecall]") { )")); REQUIRE_NOTHROW(engine.call("foo")); - REQUIRE_THROWS_MATCHES(engine.call("bar"), ScriptException, ExceptionSubStringMatcher<ScriptException>({"name 'shout' is not defined"})); + REQUIRE_THROWS_MATCHES(engine.call("bar"), script::ScriptException, utils::ExceptionSubStringMatcher<script::ScriptException>({"name 'shout' is not defined"})); } + +} // namespace org::apache::nifi::minifi::test diff --git a/extensions/standard-processors/processors/ListenSyslog.cpp b/extensions/standard-processors/processors/ListenSyslog.cpp index d9db46e90..e08d0cb15 100644 --- a/extensions/standard-processors/processors/ListenSyslog.cpp +++ b/extensions/standard-processors/processors/ListenSyslog.cpp @@ -21,6 +21,7 @@ #include "core/ProcessSession.h" #include "core/PropertyBuilder.h" #include "core/Resource.h" +#include "controllers/SSLContextService.h" namespace org::apache::nifi::minifi::processors { @@ -56,6 +57,20 @@ const core::Property ListenSyslog::MaxQueueSize( "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.") ->withDefaultValue<uint64_t>(10000)->build()); +const core::Property ListenSyslog::SSLContextService( + core::PropertyBuilder::createProperty("SSL Context Service") + ->withDescription("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be received over a secure connection. " + "This Property is only considered if the <Protocol> Property has a value of \"TCP\".") + ->asType<minifi::controllers::SSLContextService>() + ->build()); + +const core::Property ListenSyslog::ClientAuth( + core::PropertyBuilder::createProperty("Client Auth") + ->withDescription("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.") + ->withDefaultValue<std::string>(toString(utils::net::SslServer::ClientAuthOption::NONE)) + ->withAllowableValues<std::string>(utils::net::SslServer::ClientAuthOption::values()) + ->build()); + const core::Relationship ListenSyslog::Success("success", "Incoming messages that match the expected format when parsing will be sent to this relationship. " "When Parse Messages is set to false, all incoming message will be sent to this relationship."); const core::Relationship ListenSyslog::Invalid("invalid", "Incoming messages that do not match the expected format when parsing will be sent to this relationship."); @@ -91,7 +106,13 @@ void ListenSyslog::onSchedule(const std::shared_ptr<core::ProcessContext>& conte utils::net::IpProtocol protocol; context->getProperty(ProtocolProperty.getName(), protocol); - startServer(*context, MaxBatchSize, MaxQueueSize, Port, protocol); + if (protocol == utils::net::IpProtocol::TCP) { + startTcpServer(*context); + } else if (protocol == utils::net::IpProtocol::UDP) { + startUdpServer(*context); + } else { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid protocol"); + } } void ListenSyslog::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) { @@ -135,6 +156,26 @@ void ListenSyslog::transferAsFlowFile(const utils::net::Message& message, core:: session.transfer(flow_file, valid ? Success : Invalid); } +const core::Property& ListenSyslog::getMaxBatchSizeProperty() { + return MaxBatchSize; +} + +const core::Property& ListenSyslog::getMaxQueueSizeProperty() { + return MaxQueueSize; +} + +const core::Property& ListenSyslog::getPortProperty() { + return Port; +} + +const core::Property& ListenSyslog::getSslContextProperty() { + return SSLContextService; +} + +const core::Property& ListenSyslog::getClientAuthProperty() { + return ClientAuth; +} + REGISTER_RESOURCE(ListenSyslog, Processor); } // namespace org::apache::nifi::minifi::processors diff --git a/extensions/standard-processors/processors/ListenSyslog.h b/extensions/standard-processors/processors/ListenSyslog.h index 44445da9b..4f60a3ab7 100644 --- a/extensions/standard-processors/processors/ListenSyslog.h +++ b/extensions/standard-processors/processors/ListenSyslog.h @@ -45,13 +45,17 @@ class ListenSyslog : public NetworkListenerProcessor { EXTENSIONAPI static const core::Property MaxBatchSize; EXTENSIONAPI static const core::Property ParseMessages; EXTENSIONAPI static const core::Property MaxQueueSize; + EXTENSIONAPI static const core::Property SSLContextService; + EXTENSIONAPI static const core::Property ClientAuth; static auto properties() { return std::array{ Port, ProtocolProperty, MaxBatchSize, ParseMessages, - MaxQueueSize + MaxQueueSize, + SSLContextService, + ClientAuth }; } @@ -62,6 +66,13 @@ class ListenSyslog : public NetworkListenerProcessor { void initialize() override; void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) override; + protected: + const core::Property& getMaxBatchSizeProperty() override; + const core::Property& getMaxQueueSizeProperty() override; + const core::Property& getPortProperty() override; + const core::Property& getSslContextProperty() override; + const core::Property& getClientAuthProperty() override; + private: void transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) override; diff --git a/extensions/standard-processors/processors/ListenTCP.cpp b/extensions/standard-processors/processors/ListenTCP.cpp index c7d1be6d0..20d856182 100644 --- a/extensions/standard-processors/processors/ListenTCP.cpp +++ b/extensions/standard-processors/processors/ListenTCP.cpp @@ -18,6 +18,8 @@ #include "core/Resource.h" #include "core/PropertyBuilder.h" +#include "controllers/SSLContextService.h" +#include "utils/ProcessorConfigUtils.h" namespace org::apache::nifi::minifi::processors { @@ -43,6 +45,19 @@ const core::Property ListenTCP::MaxBatchSize( ->isRequired(true) ->build()); +const core::Property ListenTCP::SSLContextService( + core::PropertyBuilder::createProperty("SSL Context Service") + ->withDescription("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be received over a secure connection.") + ->asType<minifi::controllers::SSLContextService>() + ->build()); + +const core::Property ListenTCP::ClientAuth( + core::PropertyBuilder::createProperty("Client Auth") + ->withDescription("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.") + ->withDefaultValue<std::string>(toString(utils::net::SslServer::ClientAuthOption::NONE)) + ->withAllowableValues<std::string>(utils::net::SslServer::ClientAuthOption::values()) + ->build()); + const core::Relationship ListenTCP::Success("success", "Messages received successfully will be sent out this relationship."); void ListenTCP::initialize() { @@ -52,7 +67,7 @@ void ListenTCP::initialize() { void ListenTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) { gsl_Expects(context); - startServer(*context, MaxBatchSize, MaxQueueSize, Port, utils::net::IpProtocol::TCP); + startTcpServer(*context); } void ListenTCP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) { @@ -63,6 +78,26 @@ void ListenTCP::transferAsFlowFile(const utils::net::Message& message, core::Pro session.transfer(flow_file, Success); } +const core::Property& ListenTCP::getMaxBatchSizeProperty() { + return MaxBatchSize; +} + +const core::Property& ListenTCP::getMaxQueueSizeProperty() { + return MaxQueueSize; +} + +const core::Property& ListenTCP::getPortProperty() { + return Port; +} + +const core::Property& ListenTCP::getSslContextProperty() { + return SSLContextService; +} + +const core::Property& ListenTCP::getClientAuthProperty() { + return ClientAuth; +} + REGISTER_RESOURCE(ListenTCP, Processor); } // namespace org::apache::nifi::minifi::processors diff --git a/extensions/standard-processors/processors/ListenTCP.h b/extensions/standard-processors/processors/ListenTCP.h index 17f0aafc5..c720941b9 100644 --- a/extensions/standard-processors/processors/ListenTCP.h +++ b/extensions/standard-processors/processors/ListenTCP.h @@ -21,6 +21,7 @@ #include "NetworkListenerProcessor.h" #include "core/logging/LoggerConfiguration.h" +#include "utils/Enum.h" namespace org::apache::nifi::minifi::processors { @@ -36,11 +37,15 @@ class ListenTCP : public NetworkListenerProcessor { EXTENSIONAPI static const core::Property Port; EXTENSIONAPI static const core::Property MaxBatchSize; EXTENSIONAPI static const core::Property MaxQueueSize; + EXTENSIONAPI static const core::Property SSLContextService; + EXTENSIONAPI static const core::Property ClientAuth; static auto properties() { return std::array{ Port, MaxBatchSize, - MaxQueueSize + MaxQueueSize, + SSLContextService, + ClientAuth }; } @@ -50,6 +55,13 @@ class ListenTCP : public NetworkListenerProcessor { void initialize() override; void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) override; + protected: + const core::Property& getMaxBatchSizeProperty() override; + const core::Property& getMaxQueueSizeProperty() override; + const core::Property& getPortProperty() override; + const core::Property& getSslContextProperty() override; + const core::Property& getClientAuthProperty() override; + private: void transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) override; }; diff --git a/extensions/standard-processors/processors/NetworkListenerProcessor.cpp b/extensions/standard-processors/processors/NetworkListenerProcessor.cpp index 81442ec11..fa22d9096 100644 --- a/extensions/standard-processors/processors/NetworkListenerProcessor.cpp +++ b/extensions/standard-processors/processors/NetworkListenerProcessor.cpp @@ -17,6 +17,8 @@ #include "NetworkListenerProcessor.h" #include "utils/net/UdpServer.h" #include "utils/net/TcpServer.h" +#include "utils/net/Ssl.h" +#include "utils/ProcessorConfigUtils.h" namespace org::apache::nifi::minifi::processors { @@ -36,36 +38,56 @@ void NetworkListenerProcessor::onTrigger(const std::shared_ptr<core::ProcessCont } } -void NetworkListenerProcessor::startServer( - const core::ProcessContext& context, const core::Property& max_batch_size_prop, const core::Property& max_queue_size_prop, const core::Property& port_prop, utils::net::IpProtocol protocol) { - gsl_Expects(!server_thread_.joinable() && !server_); - context.getProperty(max_batch_size_prop.getName(), max_batch_size_); +NetworkListenerProcessor::ServerOptions NetworkListenerProcessor::readServerOptions(const core::ProcessContext& context) { + ServerOptions options; + context.getProperty(getMaxBatchSizeProperty().getName(), max_batch_size_); if (max_batch_size_ < 1) throw Exception(PROCESSOR_EXCEPTION, "Max Batch Size property is invalid"); uint64_t max_queue_size = 0; - context.getProperty(max_queue_size_prop.getName(), max_queue_size); - auto max_queue_size_opt = max_queue_size > 0 ? std::optional<uint64_t>(max_queue_size) : std::nullopt; - - int port; - context.getProperty(port_prop.getName(), port); + context.getProperty(getMaxQueueSizeProperty().getName(), max_queue_size); + options.max_queue_size = max_queue_size > 0 ? std::optional<uint64_t>(max_queue_size) : std::nullopt; - if (protocol == utils::net::IpProtocol::UDP) { - server_ = std::make_unique<utils::net::UdpServer>(max_queue_size_opt, port, logger_); - } else if (protocol == utils::net::IpProtocol::TCP) { - server_ = std::make_unique<utils::net::TcpServer>(max_queue_size_opt, port, logger_); - } else { - throw Exception(PROCESSOR_EXCEPTION, "Invalid protocol"); - } + context.getProperty(getPortProperty().getName(), options.port); + return options; +} +void NetworkListenerProcessor::startServer(const ServerOptions& options, utils::net::IpProtocol protocol) { server_thread_ = std::thread([this]() { server_->run(); }); logger_->log_debug("Started %s server on port %d with %s max queue size and %zu max batch size", protocol.toString(), - port, - max_queue_size_opt ? std::to_string(*max_queue_size_opt) : "unlimited", + options.port, + options.max_queue_size ? std::to_string(*options.max_queue_size) : "unlimited", max_batch_size_); } +void NetworkListenerProcessor::startTcpServer(const core::ProcessContext& context) { + gsl_Expects(!server_thread_.joinable() && !server_); + auto options = readServerOptions(context); + + std::string ssl_value; + auto& ssl_prop = getSslContextProperty(); + if (context.getProperty(ssl_prop.getName(), ssl_value) && !ssl_value.empty()) { + auto ssl_data = utils::net::getSslData(context, ssl_prop, logger_); + if (!ssl_data || !ssl_data->isValid()) { + throw Exception(PROCESSOR_EXCEPTION, "SSL Context Service is set, but no valid SSL data was found!"); + } + auto client_auth = utils::parseEnumProperty<utils::net::SslServer::ClientAuthOption>(context, getClientAuthProperty()); + server_ = std::make_unique<utils::net::SslServer>(options.max_queue_size, options.port, logger_, *ssl_data, client_auth); + } else { + server_ = std::make_unique<utils::net::TcpServer>(options.max_queue_size, options.port, logger_); + } + + startServer(options, utils::net::IpProtocol::TCP); +} + +void NetworkListenerProcessor::startUdpServer(const core::ProcessContext& context) { + gsl_Expects(!server_thread_.joinable() && !server_); + auto options = readServerOptions(context); + server_ = std::make_unique<utils::net::UdpServer>(options.max_queue_size, options.port, logger_); + startServer(options, utils::net::IpProtocol::UDP); +} + void NetworkListenerProcessor::stopServer() { if (server_) { server_->stop(); diff --git a/extensions/standard-processors/processors/NetworkListenerProcessor.h b/extensions/standard-processors/processors/NetworkListenerProcessor.h index 58170e510..24ad94436 100644 --- a/extensions/standard-processors/processors/NetworkListenerProcessor.h +++ b/extensions/standard-processors/processors/NetworkListenerProcessor.h @@ -26,6 +26,7 @@ #include "core/ProcessSession.h" #include "core/Property.h" #include "utils/net/Server.h" +#include "utils/net/SslServer.h" namespace org::apache::nifi::minifi::processors { @@ -51,10 +52,22 @@ class NetworkListenerProcessor : public core::Processor { } protected: + struct ServerOptions { + std::optional<uint64_t> max_queue_size; + int port = 0; + }; + void stopServer(); - void startServer( - const core::ProcessContext& context, const core::Property& max_batch_size_prop, const core::Property& max_queue_size_prop, const core::Property& port_prop, utils::net::IpProtocol protocol); + void startTcpServer(const core::ProcessContext& context); + void startUdpServer(const core::ProcessContext& context); + ServerOptions readServerOptions(const core::ProcessContext& context); + void startServer(const ServerOptions& options, utils::net::IpProtocol protocol); virtual void transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) = 0; + virtual const core::Property& getMaxBatchSizeProperty() = 0; + virtual const core::Property& getMaxQueueSizeProperty() = 0; + virtual const core::Property& getPortProperty() = 0; + virtual const core::Property& getSslContextProperty() = 0; + virtual const core::Property& getClientAuthProperty() = 0; uint64_t max_batch_size_{500}; std::unique_ptr<utils::net::Server> server_; diff --git a/extensions/standard-processors/tests/CMakeLists.txt b/extensions/standard-processors/tests/CMakeLists.txt index af5cf41d0..8fddd890e 100644 --- a/extensions/standard-processors/tests/CMakeLists.txt +++ b/extensions/standard-processors/tests/CMakeLists.txt @@ -47,6 +47,15 @@ FOREACH(testfile ${PROCESSOR_UNIT_TESTS}) add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) MATH(EXPR PROCESSOR_INT_TEST_COUNT "${PROCESSOR_INT_TEST_COUNT}+1") + + # Copy test resources + add_custom_command( + TARGET "${testfilename}" + POST_BUILD + COMMAND ${CMAKE_COMMAND} -E copy_directory + "${CMAKE_SOURCE_DIR}/extensions/standard-processors/tests/unit/resources" + "$<TARGET_FILE_DIR:${testfilename}>/resources" + ) ENDFOREACH() message("-- Finished building ${PROCESSOR_INT_TEST_COUNT} processor unit test file(s)...") @@ -90,4 +99,3 @@ endif() add_test(NAME TailFileTest COMMAND TailFileTest "${TEST_RESOURCES}/TestTailFile.yml" "${TEST_RESOURCES}/") add_test(NAME TailFileCronTest COMMAND TailFileTest "${TEST_RESOURCES}/TestTailFileCron.yml" "${TEST_RESOURCES}/") - diff --git a/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp b/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp index 8266cd218..c9b8e472e 100644 --- a/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp +++ b/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp @@ -20,12 +20,13 @@ #include "ListenSyslog.h" #include "SingleProcessorTestController.h" #include "Utils.h" +#include "controllers/SSLContextService.h" using ListenSyslog = org::apache::nifi::minifi::processors::ListenSyslog; using namespace std::literals::chrono_literals; -namespace org::apache::nifi::minifi::processors::testing { +namespace org::apache::nifi::minifi::test { constexpr uint64_t SYSLOG_PORT = 10255; @@ -260,7 +261,7 @@ void check_parsed_attributes(const core::FlowFile& flow_file, const ValidRFC3164 TEST_CASE("ListenSyslog without parsing test", "[ListenSyslog]") { const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog"); - test::SingleProcessorTestController controller{listen_syslog}; + SingleProcessorTestController controller{listen_syslog}; LogTestController::getInstance().setTrace<ListenSyslog>(); REQUIRE(listen_syslog->setProperty(ListenSyslog::Port, std::to_string(SYSLOG_PORT))); REQUIRE(listen_syslog->setProperty(ListenSyslog::MaxBatchSize, "2")); @@ -279,8 +280,8 @@ TEST_CASE("ListenSyslog without parsing test", "[ListenSyslog]") { protocol = "TCP"; REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP")); controller.plan->scheduleProcessor(listen_syslog); - sendMessagesViaTCP({rfc5424_logger_example_1}, SYSLOG_PORT); - sendMessagesViaTCP({invalid_syslog}, SYSLOG_PORT); + REQUIRE(utils::sendMessagesViaTCP({rfc5424_logger_example_1}, SYSLOG_PORT)); + REQUIRE(utils::sendMessagesViaTCP({invalid_syslog}, SYSLOG_PORT)); } std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> result; REQUIRE(controller.triggerUntil({{ListenSyslog::Success, 2}}, result, 300ms, 50ms)); @@ -294,7 +295,7 @@ TEST_CASE("ListenSyslog without parsing test", "[ListenSyslog]") { TEST_CASE("ListenSyslog with parsing test", "[ListenSyslog]") { const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog"); - test::SingleProcessorTestController controller{listen_syslog}; + SingleProcessorTestController controller{listen_syslog}; LogTestController::getInstance().setTrace<ListenSyslog>(); REQUIRE(listen_syslog->setProperty(ListenSyslog::Port, std::to_string(SYSLOG_PORT))); REQUIRE(listen_syslog->setProperty(ListenSyslog::MaxBatchSize, "100")); @@ -325,19 +326,18 @@ TEST_CASE("ListenSyslog with parsing test", "[ListenSyslog]") { REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP")); controller.plan->scheduleProcessor(listen_syslog); std::this_thread::sleep_for(100ms); - sendMessagesViaTCP({rfc5424_doc_example_1.unparsed_, - rfc5424_doc_example_2.unparsed_, - rfc5424_doc_example_3.unparsed_, - rfc5424_doc_example_4.unparsed_}, SYSLOG_PORT); - - sendMessagesViaTCP({rfc3164_doc_example_1.unparsed_, - rfc3164_doc_example_2.unparsed_, - rfc3164_doc_example_3.unparsed_, - rfc3164_doc_example_4.unparsed_}, SYSLOG_PORT); - - - sendMessagesViaTCP({rfc5424_logger_example_1}, SYSLOG_PORT); - sendMessagesViaTCP({invalid_syslog}, SYSLOG_PORT); + REQUIRE(utils::sendMessagesViaTCP({rfc5424_doc_example_1.unparsed_, + rfc5424_doc_example_2.unparsed_, + rfc5424_doc_example_3.unparsed_, + rfc5424_doc_example_4.unparsed_}, SYSLOG_PORT)); + + REQUIRE(utils::sendMessagesViaTCP({rfc3164_doc_example_1.unparsed_, + rfc3164_doc_example_2.unparsed_, + rfc3164_doc_example_3.unparsed_, + rfc3164_doc_example_4.unparsed_}, SYSLOG_PORT)); + + REQUIRE(utils::sendMessagesViaTCP({rfc5424_logger_example_1}, SYSLOG_PORT)); + REQUIRE(utils::sendMessagesViaTCP({invalid_syslog}, SYSLOG_PORT)); } std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> result; @@ -378,7 +378,7 @@ TEST_CASE("ListenSyslog with parsing test", "[ListenSyslog]") { TEST_CASE("ListenSyslog can be rescheduled", "[ListenSyslog]") { const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog"); - test::SingleProcessorTestController controller{listen_syslog}; + SingleProcessorTestController controller{listen_syslog}; LogTestController::getInstance().setTrace<ListenSyslog>(); REQUIRE(listen_syslog->setProperty(ListenSyslog::Port, std::to_string(SYSLOG_PORT))); REQUIRE(listen_syslog->setProperty(ListenSyslog::MaxBatchSize, "100")); @@ -401,7 +401,7 @@ TEST_CASE("ListenSyslog can be rescheduled", "[ListenSyslog]") { TEST_CASE("ListenSyslog max queue and max batch size test", "[ListenSyslog]") { const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog"); - test::SingleProcessorTestController controller{listen_syslog}; + SingleProcessorTestController controller{listen_syslog}; REQUIRE(listen_syslog->setProperty(ListenSyslog::Port, std::to_string(SYSLOG_PORT))); REQUIRE(listen_syslog->setProperty(ListenSyslog::MaxBatchSize, "10")); REQUIRE(listen_syslog->setProperty(ListenSyslog::ParseMessages, "false")); @@ -415,16 +415,16 @@ TEST_CASE("ListenSyslog max queue and max batch size test", "[ListenSyslog]") { for (auto i = 0; i < 100; ++i) { sendUDPPacket(rfc5424_doc_example_1.unparsed_, SYSLOG_PORT); } - CHECK(countLogOccurrencesUntil("Queue is full. UDP message ignored.", 50, 300ms, 50ms)); + CHECK(utils::countLogOccurrencesUntil("Queue is full. UDP message ignored.", 50, 300ms, 50ms)); } SECTION("TCP") { REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP")); controller.plan->scheduleProcessor(listen_syslog); for (auto i = 0; i < 100; ++i) { - sendMessagesViaTCP({rfc5424_doc_example_1.unparsed_}, SYSLOG_PORT); + REQUIRE(utils::sendMessagesViaTCP({rfc5424_doc_example_1.unparsed_}, SYSLOG_PORT)); } - CHECK(countLogOccurrencesUntil("Queue is full. TCP message ignored.", 50, 300ms, 50ms)); + CHECK(utils::countLogOccurrencesUntil("Queue is full. TCP message ignored.", 50, 300ms, 50ms)); } CHECK(controller.trigger().at(ListenSyslog::Success).size() == 10); CHECK(controller.trigger().at(ListenSyslog::Success).size() == 10); @@ -434,4 +434,36 @@ TEST_CASE("ListenSyslog max queue and max batch size test", "[ListenSyslog]") { CHECK(controller.trigger().at(ListenSyslog::Success).empty()); } -} // namespace org::apache::nifi::minifi::processors::testing +TEST_CASE("Test ListenSyslog via TCP with SSL connection", "[ListenSyslog]") { + const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog"); + + SingleProcessorTestController controller{listen_syslog}; + auto ssl_context_service = controller.plan->addController("SSLContextService", "SSLContextService"); + const auto executable_dir = minifi::utils::file::FileUtils::get_executable_dir(); + REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::CACertificate.getName(), + minifi::utils::file::concat_path(executable_dir, "resources/ca_cert.crt"))); + REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::ClientCertificate.getName(), + minifi::utils::file::concat_path(executable_dir, "resources/cert_and_private_key.pem"))); + REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::PrivateKey.getName(), + minifi::utils::file::concat_path(executable_dir, "resources/cert_and_private_key.pem"))); + LogTestController::getInstance().setTrace<ListenSyslog>(); + REQUIRE(listen_syslog->setProperty(ListenSyslog::Port, std::to_string(SYSLOG_PORT))); + REQUIRE(listen_syslog->setProperty(ListenSyslog::MaxBatchSize, "2")); + REQUIRE(listen_syslog->setProperty(ListenSyslog::ParseMessages, "false")); + REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP")); + REQUIRE(listen_syslog->setProperty(ListenSyslog::SSLContextService, "SSLContextService")); + ssl_context_service->enable(); + controller.plan->scheduleProcessor(listen_syslog); + REQUIRE(utils::sendMessagesViaSSL({rfc5424_logger_example_1}, SYSLOG_PORT, minifi::utils::file::concat_path(executable_dir, "resources/ca_cert.crt"))); + REQUIRE(utils::sendMessagesViaSSL({invalid_syslog}, SYSLOG_PORT, minifi::utils::file::concat_path(executable_dir, "/resources/ca_cert.crt"))); + + std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> result; + REQUIRE(controller.triggerUntil({{ListenSyslog::Success, 2}}, result, 300ms, 50ms)); + CHECK(controller.plan->getContent(result.at(ListenSyslog::Success)[0]) == rfc5424_logger_example_1); + CHECK(controller.plan->getContent(result.at(ListenSyslog::Success)[1]) == invalid_syslog); + + check_for_only_basic_attributes(*result.at(ListenSyslog::Success)[0], SYSLOG_PORT, "TCP"); + check_for_only_basic_attributes(*result.at(ListenSyslog::Success)[1], SYSLOG_PORT, "TCP"); +} + +} // namespace org::apache::nifi::minifi::test diff --git a/extensions/standard-processors/tests/unit/ListenTcpTests.cpp b/extensions/standard-processors/tests/unit/ListenTcpTests.cpp index aad3f5dca..559db0c2f 100644 --- a/extensions/standard-processors/tests/unit/ListenTcpTests.cpp +++ b/extensions/standard-processors/tests/unit/ListenTcpTests.cpp @@ -20,6 +20,7 @@ #include "processors/ListenTCP.h" #include "SingleProcessorTestController.h" #include "Utils.h" +#include "controllers/SSLContextService.h" using ListenTCP = org::apache::nifi::minifi::processors::ListenTCP; @@ -37,14 +38,14 @@ void check_for_attributes(core::FlowFile& flow_file) { TEST_CASE("ListenTCP test multiple messages", "[ListenTCP]") { const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP"); - test::SingleProcessorTestController controller{listen_tcp}; + SingleProcessorTestController controller{listen_tcp}; LogTestController::getInstance().setTrace<ListenTCP>(); REQUIRE(listen_tcp->setProperty(ListenTCP::Port, std::to_string(PORT))); REQUIRE(listen_tcp->setProperty(ListenTCP::MaxBatchSize, "2")); controller.plan->scheduleProcessor(listen_tcp); - sendMessagesViaTCP({"test_message_1"}, PORT); - sendMessagesViaTCP({"another_message"}, PORT); + REQUIRE(utils::sendMessagesViaTCP({"test_message_1"}, PORT)); + REQUIRE(utils::sendMessagesViaTCP({"another_message"}, PORT)); ProcessorTriggerResult result; REQUIRE(controller.triggerUntil({{ListenTCP::Success, 2}}, result, 300s, 50ms)); CHECK(controller.plan->getContent(result.at(ListenTCP::Success)[0]) == "test_message_1"); @@ -56,7 +57,7 @@ TEST_CASE("ListenTCP test multiple messages", "[ListenTCP]") { TEST_CASE("ListenTCP can be rescheduled", "[ListenTCP]") { const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP"); - test::SingleProcessorTestController controller{listen_tcp}; + SingleProcessorTestController controller{listen_tcp}; LogTestController::getInstance().setTrace<ListenTCP>(); REQUIRE(listen_tcp->setProperty(ListenTCP::Port, std::to_string(PORT))); REQUIRE(listen_tcp->setProperty(ListenTCP::MaxBatchSize, "100")); @@ -69,7 +70,7 @@ TEST_CASE("ListenTCP can be rescheduled", "[ListenTCP]") { TEST_CASE("ListenTCP max queue and max batch size test", "[ListenTCP]") { const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP"); - test::SingleProcessorTestController controller{listen_tcp}; + SingleProcessorTestController controller{listen_tcp}; REQUIRE(listen_tcp->setProperty(ListenTCP::Port, std::to_string(PORT))); REQUIRE(listen_tcp->setProperty(ListenTCP::MaxBatchSize, "10")); REQUIRE(listen_tcp->setProperty(ListenTCP::MaxQueueSize, "50")); @@ -78,10 +79,10 @@ TEST_CASE("ListenTCP max queue and max batch size test", "[ListenTCP]") { controller.plan->scheduleProcessor(listen_tcp); for (auto i = 0; i < 100; ++i) { - sendMessagesViaTCP({"test_message"}, PORT); + REQUIRE(utils::sendMessagesViaTCP({"test_message"}, PORT)); } - CHECK(countLogOccurrencesUntil("Queue is full. TCP message ignored.", 50, 300ms, 50ms)); + CHECK(utils::countLogOccurrencesUntil("Queue is full. TCP message ignored.", 50, 300ms, 50ms)); CHECK(controller.trigger().at(ListenTCP::Success).size() == 10); CHECK(controller.trigger().at(ListenTCP::Success).size() == 10); CHECK(controller.trigger().at(ListenTCP::Success).size() == 10); @@ -90,4 +91,76 @@ TEST_CASE("ListenTCP max queue and max batch size test", "[ListenTCP]") { CHECK(controller.trigger().at(ListenTCP::Success).empty()); } +TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP]") { + const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP"); + + SingleProcessorTestController controller{listen_tcp}; + auto ssl_context_service = controller.plan->addController("SSLContextService", "SSLContextService"); + LogTestController::getInstance().setTrace<ListenTCP>(); + const auto executable_dir = minifi::utils::file::FileUtils::get_executable_dir(); + REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::CACertificate.getName(), + minifi::utils::file::concat_path(executable_dir, "resources/ca_cert.crt"))); + REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::ClientCertificate.getName(), + minifi::utils::file::concat_path(executable_dir, "resources/cert_and_private_key.pem"))); + REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::PrivateKey.getName(), + minifi::utils::file::concat_path(executable_dir, "resources/cert_and_private_key.pem"))); + REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::Passphrase.getName(), "Password12")); + REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::Port.getName(), std::to_string(PORT))); + REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::MaxBatchSize.getName(), "2")); + REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::SSLContextService.getName(), "SSLContextService")); + std::vector<std::string> expected_successful_messages; + + SECTION("Without client certificate verification") { + SECTION("Client certificate not required, Client Auth set to NONE by default") { + } + SECTION("Client certificate not required, but validated if provided") { + REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "WANT")); + } + ssl_context_service->enable(); + controller.plan->scheduleProcessor(listen_tcp); + + expected_successful_messages = {"test_message_1", "another_message"}; + for (const auto& message : expected_successful_messages) { + REQUIRE(utils::sendMessagesViaSSL({message}, PORT, minifi::utils::file::concat_path(executable_dir, "resources/ca_cert.crt"))); + } + } + + SECTION("With client certificate provided") { + SECTION("Client certificate required") { + REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "REQUIRED")); + } + SECTION("Client certificate not required but validated") { + REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "WANT")); + } + ssl_context_service->enable(); + controller.plan->scheduleProcessor(listen_tcp); + + minifi::utils::net::SslData ssl_data; + ssl_data.ca_loc = minifi::utils::file::FileUtils::get_executable_dir() + "/resources/ca_cert.crt"; + ssl_data.cert_loc = minifi::utils::file::FileUtils::get_executable_dir() + "/resources/cert_and_private_key.pem"; + ssl_data.key_loc = minifi::utils::file::FileUtils::get_executable_dir() + "/resources/cert_and_private_key.pem"; + ssl_data.key_pw = "Password12"; + + expected_successful_messages = {"test_message_1", "another_message"}; + for (const auto& message : expected_successful_messages) { + REQUIRE(utils::sendMessagesViaSSL({message}, PORT, minifi::utils::file::FileUtils::get_executable_dir() + "/resources/ca_cert.crt", ssl_data)); + } + } + + SECTION("Required certificate not provided") { + REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "REQUIRED")); + ssl_context_service->enable(); + controller.plan->scheduleProcessor(listen_tcp); + + REQUIRE_FALSE(utils::sendMessagesViaSSL({"test_message_1"}, PORT, minifi::utils::file::concat_path(executable_dir, "/resources/ca_cert.crt"))); + } + + ProcessorTriggerResult result; + REQUIRE(controller.triggerUntil({{ListenTCP::Success, expected_successful_messages.size()}}, result, 300s, 50ms)); + for (std::size_t i = 0; i < expected_successful_messages.size(); ++i) { + CHECK(controller.plan->getContent(result.at(ListenTCP::Success)[i]) == expected_successful_messages[i]); + check_for_attributes(*result.at(ListenTCP::Success)[i]); + } +} + } // namespace org::apache::nifi::minifi::test diff --git a/extensions/standard-processors/tests/unit/resources/ca_cert.crt b/extensions/standard-processors/tests/unit/resources/ca_cert.crt new file mode 100644 index 000000000..e801553c9 --- /dev/null +++ b/extensions/standard-processors/tests/unit/resources/ca_cert.crt @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDNDCCAhwCCQDoXhDkdH/BBjANBgkqhkiG9w0BAQsFADBcMQswCQYDVQQGEwJV +UzELMAkGA1UECAwCQ0ExFjAUBgNVBAoMDUV4YW1wbGUsIEluYy4xKDAmBgNVBAMM +H0dvb2QgUm9vdCBDZXJ0aWZpY2F0ZSBBdXRob3JpdHkwHhcNMTkwNzEyMDk1MjUz +WhcNNDkwNzA0MDk1MjUzWjBcMQswCQYDVQQGEwJVUzELMAkGA1UECAwCQ0ExFjAU +BgNVBAoMDUV4YW1wbGUsIEluYy4xKDAmBgNVBAMMH0dvb2QgUm9vdCBDZXJ0aWZp +Y2F0ZSBBdXRob3JpdHkwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDG +t5qp++4NAO83uASsVx7xRc3YS1Ss6La2opJTeXSnnsL6d+eLIUZrO6R/vofjLMPb +qHisnQXAtl560d/XPBXm/ydp2IBLJQJW9aRxa/zqcf4tDTdBLKXYHhqKSQDJGS78 +vOuNuhf6T+p1guqnLYxwlRp6V8DMY/nC5n+IgByr9Jp2QtqJceH5WdyABVauqtMo +LKXdbhfU6lDZ1XIZNeoKY8u2s34UQLUvOGaP/FzYHvKev1KzFF/nR3+svK8cvxXM +EuqHM5tdtIp1ugjvR66PUIT00HoT00wS6VIpBdHq/8uXJeY77lr52xyVdk282tlw +wr9/W0AGXjVMW3O+VRhFAgMBAAEwDQYJKoZIhvcNAQELBQADggEBAE5HYjHkh3fI +qakhENGL5PSszmOz5yQRrggP2ZJeEAoFjy5fbf/zUPIPMgMa0qM4QI+2C0iGlem1 +c1MCGNk5BiDPWMaUjppYmPZWkXzYu9Nl1dizXYidcnTiiBTROkpMij2fzCErymx9 +CmYxfeFyeJ5uAHSWSOGCfvlxi0vHvHn/+5rm0eqHcGP2c9ivW/SC/6RCXnHuIS9O +O/UHrQPQe7YmdBgCHw4K4UHLZkYPH6osMPdII09PbZBB1TgrogbuA6TMp9NU6LrX +WNN3nhFaVVjEb8tawMabfG9PU/1PKGRuNdaLsYb3IXhT0I/SWobD3MJ9xcO9sAhv +QKZuUQf4ntI= +-----END CERTIFICATE----- diff --git a/extensions/standard-processors/tests/unit/resources/cert_and_private_key.pem b/extensions/standard-processors/tests/unit/resources/cert_and_private_key.pem new file mode 100644 index 000000000..e4891c58b --- /dev/null +++ b/extensions/standard-processors/tests/unit/resources/cert_and_private_key.pem @@ -0,0 +1,46 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEpQIBAAKCAQEAtdEk8MHL8RFiOuHz/hfyjEe3PxumlNnubzn7u6EdJpiLhOLs +cWcvQAbYvbsyI6duGzKh1FRNmS8+Q/gp/36vmssZy96B4K+axiG7WBqHfjPJbjfa +NHbz3D7D7i36r/Vu/pXsBMPb/DSJ978maY3oikSB9906bx73XJHAx3RLxgLHl4po +vt20SDOY9R2Klcbew4pDXuUpRdEK0h1+fdZkYU92YiAz2O86eYMvwgorCX9tBrcw +sih/cYYiTQ9DMZ3DPJ0HfDH5T5gIPH/+5/YVHaTBdthhIZ13UNY/X8XcOVO/nlzC +3/MtBiTxKj4zIyqncbClm4BqzW3S5hxUQXR32QIDAQABAoIBAAXoCmwrz4VATFGf +X37EpmN6PPC25D13qvBAEPZycHD9iaLCgG3arUVGM6pON33DBaeqiGlOZ8rvJvWs +TSj4o5nCuU7PJqb27W88T0q4aehmpEeJVvRXXOqtu020fq1Sqs1ob2dkOXRC/Kxo +sEXDj2dWfGZh8HEFr4F5VqrkE0YWaQLaNHf9g6vAuOtlNMnhu5iM7mNq0qQi4Qg0 +zmOpEyAK5obhPEa8eYgjuWUeuul342wpMEGaFqD3lr4rnhcESZtm/S37L6lJ24UF +SIBPzuEjEDlthlh3tqgKyQFsHvcMN4XN4850J/nMoX8jDcvnV7iYYFykHEb6y6FZ ++ZlftNECgYEA2hS+s7yer2bQGw3LsKqwpNmpckLeAb84JEbra6i7A7SPUMEYwjOm +Q2ePpz6ZBVDs7qODBgMV2g9a9GCbSzgV7SeQ0367dCPugONckVQvMVU5wPtSkFKF +8jLn66+6YK64ASTqLLVd8TkU3bdByWcsh3JTR/lmDwlSeGjbm1OETj0CgYEA1W41 +Pi8ZbGDrpc9/CnyFCLMaipq9cAm4n/M58CVf0ogxeAXShIcDboTfv7lqpnqM2vg7 +sSRjyHz0++5VZTNSnQDlLIndQHQ6NKKC1tb0zNKlRuL2gwMHwMmWLqCjbLsqSP5E +lHEM8fn2EVAMiKRod01kOY7OnUnPSSgMD7QvJc0CgYEAyWqZi0WtRhDuKd5+/0dW +6JqDrp1lkDV887xwmLl5KH3uU8ZUSKENcXnHqs7c45UPj4SDcd0NpJ3EAqrrIvjE +/4kocL2/AhBhqrbS+wLGp4iwU7WLVvJw9fXgT8S4na0hEyV2Bx7nifCPfgtQfmSF +Mv/7PSFyCncwrTcjhP0I2H0CgYEAkTNbEaUlXLBLYRDbUx0HvLVstyMzAgf7DQaC +QjiLCkYRsZ/0aqkX0pafSmYwgnYZYddDdO5W3Ez2tnacri7OY3X6c+SPG4x3FNwC +u3qeLMKaIrHCF7t2CNicTbiHti9XQzWJHpwSvITbvUeCX2vKjm+eYfIf6q4OUazn +F7/z23kCgYEA0r2N20DkEN9uD281SZbjAmkjYMMvUkDyChk2oWnODZcFkEhuaGM9 +0TBw1vjpoO67H/tSNEUziXMNLtnH4p5gRzP84ZGpMPxe4MK6pIwbDxhDyPlLCwOG +dYnaYyWqsiMbqtv6LbMh/uatuMpCzqJEQZz80xDNUp3k90muk3+0M4w= +-----END RSA PRIVATE KEY----- +-----BEGIN CERTIFICATE----- +MIIDJTCCAg0CCQDclmfqcI6z1DANBgkqhkiG9w0BAQsFADBcMQswCQYDVQQGEwJV +UzELMAkGA1UECAwCQ0ExFjAUBgNVBAoMDUV4YW1wbGUsIEluYy4xKDAmBgNVBAMM +H0dvb2QgUm9vdCBDZXJ0aWZpY2F0ZSBBdXRob3JpdHkwHhcNMTkwNzEyMDk1MjUz +WhcNMjkwNzA5MDk1MjUzWjBNMQswCQYDVQQGEwJVUzELMAkGA1UECAwCQ0ExFjAU +BgNVBAoMDUV4YW1wbGUsIEluYy4xGTAXBgNVBAMMEGdvb2QuZXhhbXBsZS5jb20w +ggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC10STwwcvxEWI64fP+F/KM +R7c/G6aU2e5vOfu7oR0mmIuE4uxxZy9ABti9uzIjp24bMqHUVE2ZLz5D+Cn/fq+a +yxnL3oHgr5rGIbtYGod+M8luN9o0dvPcPsPuLfqv9W7+lewEw9v8NIn3vyZpjeiK +RIH33TpvHvdckcDHdEvGAseXimi+3bRIM5j1HYqVxt7DikNe5SlF0QrSHX591mRh +T3ZiIDPY7zp5gy/CCisJf20GtzCyKH9xhiJND0MxncM8nQd8MflPmAg8f/7n9hUd +pMF22GEhnXdQ1j9fxdw5U7+eXMLf8y0GJPEqPjMjKqdxsKWbgGrNbdLmHFRBdHfZ +AgMBAAEwDQYJKoZIhvcNAQELBQADggEBAIh6k/epw3dWtRuMwXxjqEobi/RD/8Nk +52kX6x8WTcnglrSzPSvkhnfR5PQ9whY2Zbw0aVdenejlGZEi8cAxwmJbN4NIhQwW +FjHYYQA0MPgFGq/4XFT9E49aS212+ivUBRoPlWfw7QmCdGq3z6eQGfVtIGGLfSGH +cvnC9Z4VdY0RJrnzgRKd7iq/RW66u3Uyg1fdOKCp9F5PSwwl+6dPgKO84muWjRi4 +9y+htcXSboEtYQy/ncul0MeJ8fGTY1YEG2QUolmCKeJ8a2e6SHcX0Unu+6tAD8sK +fjpZAOI1lgRrhrIKhi3Rx0aCkhayhvvScDQL0ODA5ciCu5EJHRhWCbg= +-----END CERTIFICATE----- diff --git a/extensions/windows-event-log/tests/CWELCustomProviderTests.cpp b/extensions/windows-event-log/tests/CWELCustomProviderTests.cpp index a06125cdf..1a771ec3a 100644 --- a/extensions/windows-event-log/tests/CWELCustomProviderTests.cpp +++ b/extensions/windows-event-log/tests/CWELCustomProviderTests.cpp @@ -38,6 +38,8 @@ // using the command "mc -um unit-test-provider.man" #include "custom-provider/unit-test-provider.h" +namespace org::apache::nifi::minifi::test { + namespace { struct CustomEventData { @@ -151,7 +153,7 @@ const std::string EVENT_DATA_JSON = R"( TEST_CASE("ConsumeWindowsEventLog prints events in JSON::Simple correctly custom provider", "[onTrigger]") { std::string event = CustomProviderController{"JSON", "Simple"}.run(); - verifyJSON(event, R"( + utils::verifyJSON(event, R"( { "System": { "Provider": { @@ -166,7 +168,7 @@ TEST_CASE("ConsumeWindowsEventLog prints events in JSON::Simple correctly custom TEST_CASE("ConsumeWindowsEventLog prints events in JSON::Flattened correctly custom provider", "[onTrigger]") { std::string event = CustomProviderController{"JSON", "Flattened"}.run(); - verifyJSON(event, R"( + utils::verifyJSON(event, R"( { "Name": ")" + CUSTOM_PROVIDER_NAME + R"(", "Channel": ")" + CUSTOM_CHANNEL /* Channel is not overwritten by data named "Channel" */ + R"(", @@ -176,3 +178,5 @@ TEST_CASE("ConsumeWindowsEventLog prints events in JSON::Flattened correctly cus } )"); } + +} // namespace org::apache::nifi::minifi::test diff --git a/extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp b/extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp index 90e728a93..47d69bc43 100644 --- a/extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp +++ b/extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp @@ -36,6 +36,8 @@ using PutFile = org::apache::nifi::minifi::processors::PutFile; using ConfigurableComponent = org::apache::nifi::minifi::core::ConfigurableComponent; using IdGenerator = org::apache::nifi::minifi::utils::IdGenerator; +namespace org::apache::nifi::minifi::test { + namespace { const std::string APPLICATION_CHANNEL = "Application"; @@ -69,7 +71,7 @@ TEST_CASE("ConsumeWindowsEventLog constructor works", "[create]") { std::shared_ptr<TestPlan> test_plan = test_controller.createPlan(); REQUIRE_NOTHROW(ConsumeWindowsEventLog("one")); - REQUIRE_NOTHROW(ConsumeWindowsEventLog("two", utils::IdGenerator::getIdGenerator()->generate())); + REQUIRE_NOTHROW(ConsumeWindowsEventLog("two", IdGenerator::getIdGenerator()->generate())); REQUIRE_NOTHROW(test_plan->addProcessor("ConsumeWindowsEventLog", "cwel")); } @@ -339,7 +341,7 @@ TEST_CASE("ConsumeWindowsEventLog prints events in JSON::Simple correctly", "[on std::string event = SimpleFormatTestController{APPLICATION_CHANNEL, "*", "JSON", "Simple"}.run(); // the json must be single-line REQUIRE(event.find('\n') == std::string::npos); - verifyJSON(event, R"json( + utils::verifyJSON(event, R"json( { "System": { "Provider": { @@ -358,7 +360,7 @@ TEST_CASE("ConsumeWindowsEventLog prints events in JSON::Simple correctly", "[on TEST_CASE("ConsumeWindowsEventLog prints events in JSON::Flattened correctly", "[onTrigger]") { std::string event = SimpleFormatTestController{APPLICATION_CHANNEL, "*", "JSON", "Flattened"}.run(); - verifyJSON(event, R"json( + utils::verifyJSON(event, R"json( { "Name": "Application", "Channel": "Application", @@ -373,7 +375,7 @@ TEST_CASE("ConsumeWindowsEventLog prints events in JSON::Flattened correctly", " TEST_CASE("ConsumeWindowsEventLog prints events in JSON::Raw correctly", "[onTrigger]") { std::string event = SimpleFormatTestController{APPLICATION_CHANNEL, "*", "JSON", "Raw"}.run(); - verifyJSON(event, R"json( + utils::verifyJSON(event, R"json( [ { "name": "Event", @@ -438,3 +440,5 @@ TEST_CASE("ConsumeWindowsEventLog batch commit size works", "[onTrigger]") { batchCommitSizeTestHelper(5, 1, 1); batchCommitSizeTestHelper(5, 0, 5); } + +} // namespace org::apache::nifi::minifi::test diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h index dcf4af558..8aacce653 100644 --- a/libminifi/include/Connection.h +++ b/libminifi/include/Connection.h @@ -35,12 +35,14 @@ #include "core/Repository.h" #include "utils/FlowFileQueue.h" -struct ConnectionTestAccessor; - namespace org::apache::nifi::minifi { +namespace test::utils { +struct ConnectionTestAccessor; +} // namespace test::utils + class Connection : public core::Connectable { - friend struct ::ConnectionTestAccessor; + friend struct test::utils::ConnectionTestAccessor; public: explicit Connection(std::shared_ptr<core::Repository> flow_repository, std::shared_ptr<core::ContentRepository> content_repo, const std::string &name); explicit Connection(std::shared_ptr<core::Repository> flow_repository, std::shared_ptr<core::ContentRepository> content_repo, const std::string &name, const utils::Identifier &uuid); diff --git a/libminifi/include/utils/FlowFileQueue.h b/libminifi/include/utils/FlowFileQueue.h index cebd939d6..584975d7b 100644 --- a/libminifi/include/utils/FlowFileQueue.h +++ b/libminifi/include/utils/FlowFileQueue.h @@ -26,16 +26,14 @@ #include "SwapManager.h" #include "TimeUtil.h" +namespace org::apache::nifi::minifi::test::utils { struct FlowFileQueueTestAccessor; +} // namespace org::apache::nifi::minifi::test::utils -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace utils { +namespace org::apache::nifi::minifi::utils { class FlowFileQueue { - friend struct ::FlowFileQueueTestAccessor; + friend struct test::utils::FlowFileQueueTestAccessor; using TimePoint = std::chrono::steady_clock::time_point; public: @@ -109,8 +107,4 @@ class FlowFileQueue { std::shared_ptr<core::logging::Logger> logger_; }; -} // namespace utils -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org +} // namespace org::apache::nifi::minifi::utils diff --git a/libminifi/include/utils/net/Server.h b/libminifi/include/utils/net/Server.h index 1266a148a..5a6d7622d 100644 --- a/libminifi/include/utils/net/Server.h +++ b/libminifi/include/utils/net/Server.h @@ -49,13 +49,13 @@ struct Message { class Server { public: - void run() { + virtual void run() { io_context_.run(); } - void reset() { + virtual void reset() { io_context_.restart(); } - void stop() { + virtual void stop() { io_context_.stop(); } bool queueEmpty() { @@ -70,7 +70,7 @@ class Server { protected: Server(std::optional<size_t> max_queue_size, std::shared_ptr<core::logging::Logger> logger) - : max_queue_size_(max_queue_size), logger_(logger) {} + : max_queue_size_(std::move(max_queue_size)), logger_(logger) {} utils::ConcurrentQueue<Message> concurrent_queue_; asio::io_context io_context_; diff --git a/libminifi/include/utils/net/SessionHandlingServer.h b/libminifi/include/utils/net/SessionHandlingServer.h new file mode 100644 index 000000000..f716a8758 --- /dev/null +++ b/libminifi/include/utils/net/SessionHandlingServer.h @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <utility> +#include <memory> + +#include "Server.h" +#include "asio/ssl.hpp" + +namespace org::apache::nifi::minifi::utils::net { + +template<typename SessionType> +class SessionHandlingServer : public Server { + public: + SessionHandlingServer(std::optional<size_t> max_queue_size, uint16_t port, std::shared_ptr<core::logging::Logger> logger) + : Server(max_queue_size, std::move(logger)), + acceptor_(io_context_, asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port)) { + } + + void run() override { + startAccept(); + Server::run(); + } + + protected: + void startAccept() { + auto new_session = createSession(); + acceptor_.async_accept(new_session->getSocket(), + [this, new_session](const auto& error_code) -> void { + handleAccept(new_session, error_code); + }); + } + + void handleAccept(const std::shared_ptr<SessionType>& session, const std::error_code& error) { + if (error) { + return; + } + + session->start(); + auto new_session = createSession(); + acceptor_.async_accept(new_session->getSocket(), + [this, new_session](const auto& error_code) -> void { + handleAccept(new_session, error_code); + }); + } + + virtual std::shared_ptr<SessionType> createSession() = 0; + + asio::ip::tcp::acceptor acceptor_; +}; + +} // namespace org::apache::nifi::minifi::utils::net diff --git a/libminifi/include/utils/net/Ssl.h b/libminifi/include/utils/net/Ssl.h new file mode 100644 index 000000000..de8472889 --- /dev/null +++ b/libminifi/include/utils/net/Ssl.h @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <string> +#include <memory> +#include <optional> + +#include "core/ProcessContext.h" +#include "core/Property.h" +#include "core/logging/Logger.h" + +namespace org::apache::nifi::minifi::utils::net { + +struct SslData { + std::string ca_loc; + std::string cert_loc; + std::string key_loc; + std::string key_pw; + + bool isValid() const { + return !cert_loc.empty() && !key_loc.empty(); + } +}; + +std::optional<utils::net::SslData> getSslData(const core::ProcessContext& context, const core::Property& ssl_prop, const std::shared_ptr<core::logging::Logger>& logger); + +} // namespace org::apache::nifi::minifi::utils::net diff --git a/libminifi/include/utils/net/TcpServer.h b/libminifi/include/utils/net/SslServer.h similarity index 56% copy from libminifi/include/utils/net/TcpServer.h copy to libminifi/include/utils/net/SslServer.h index 3ccebdb78..ddd04e773 100644 --- a/libminifi/include/utils/net/TcpServer.h +++ b/libminifi/include/utils/net/SslServer.h @@ -16,47 +16,50 @@ */ #pragma once -#include <optional> +#include "SessionHandlingServer.h" + #include <memory> -#include <system_error> +#include <string> -#include "Server.h" -#include "utils/MinifiConcurrentQueue.h" -#include "core/logging/Logger.h" -#include "core/logging/LoggerConfiguration.h" -#include "asio/ts/buffer.hpp" -#include "asio/ts/internet.hpp" -#include "asio/streambuf.hpp" +#include "Ssl.h" +#include "asio/ssl.hpp" namespace org::apache::nifi::minifi::utils::net { -class TcpSession : public std::enable_shared_from_this<TcpSession> { +using ssl_socket = asio::ssl::stream<asio::ip::tcp::socket>; + +class SslSession : public std::enable_shared_from_this<SslSession> { public: - TcpSession(asio::io_context& io_context, utils::ConcurrentQueue<Message>& concurrent_queue, std::optional<size_t> max_queue_size, std::shared_ptr<core::logging::Logger> logger); + SslSession(asio::io_context& io_context, asio::ssl::context& context, utils::ConcurrentQueue<Message>& concurrent_queue, + std::optional<size_t> max_queue_size, std::shared_ptr<core::logging::Logger> logger); - asio::ip::tcp::socket& getSocket(); + ssl_socket::lowest_layer_type& getSocket(); void start(); void handleReadUntilNewLine(std::error_code error_code); - private: + protected: utils::ConcurrentQueue<Message>& concurrent_queue_; std::optional<size_t> max_queue_size_; - asio::ip::tcp::socket socket_; asio::basic_streambuf<std::allocator<char>> buffer_; std::shared_ptr<core::logging::Logger> logger_; + ssl_socket socket_; }; -class TcpServer : public Server { +class SslServer : public SessionHandlingServer<SslSession> { public: - TcpServer(std::optional<size_t> max_queue_size, - uint16_t port, - std::shared_ptr<core::logging::Logger> logger); + SMART_ENUM(ClientAuthOption, + (NONE, "NONE"), + (WANT, "WANT"), + (REQUIRED, "REQUIRED") + ) + + SslServer(std::optional<size_t> max_queue_size, uint16_t port, std::shared_ptr<core::logging::Logger> logger, SslData ssl_data, ClientAuthOption client_auth); - private: - void startAccept(); - void handleAccept(const std::shared_ptr<TcpSession>& session, const std::error_code& error); + protected: + std::shared_ptr<SslSession> createSession() override; - asio::ip::tcp::acceptor acceptor_; + asio::ssl::context context_; + SslData ssl_data_; }; } // namespace org::apache::nifi::minifi::utils::net diff --git a/libminifi/include/utils/net/TcpServer.h b/libminifi/include/utils/net/TcpServer.h index 3ccebdb78..a70b26bd9 100644 --- a/libminifi/include/utils/net/TcpServer.h +++ b/libminifi/include/utils/net/TcpServer.h @@ -18,22 +18,14 @@ #include <optional> #include <memory> -#include <system_error> -#include "Server.h" -#include "utils/MinifiConcurrentQueue.h" -#include "core/logging/Logger.h" -#include "core/logging/LoggerConfiguration.h" -#include "asio/ts/buffer.hpp" -#include "asio/ts/internet.hpp" -#include "asio/streambuf.hpp" +#include "SessionHandlingServer.h" namespace org::apache::nifi::minifi::utils::net { class TcpSession : public std::enable_shared_from_this<TcpSession> { public: TcpSession(asio::io_context& io_context, utils::ConcurrentQueue<Message>& concurrent_queue, std::optional<size_t> max_queue_size, std::shared_ptr<core::logging::Logger> logger); - asio::ip::tcp::socket& getSocket(); void start(); void handleReadUntilNewLine(std::error_code error_code); @@ -41,22 +33,19 @@ class TcpSession : public std::enable_shared_from_this<TcpSession> { private: utils::ConcurrentQueue<Message>& concurrent_queue_; std::optional<size_t> max_queue_size_; - asio::ip::tcp::socket socket_; asio::basic_streambuf<std::allocator<char>> buffer_; + asio::ip::tcp::socket socket_; std::shared_ptr<core::logging::Logger> logger_; }; -class TcpServer : public Server { +class TcpServer : public SessionHandlingServer<TcpSession> { public: TcpServer(std::optional<size_t> max_queue_size, uint16_t port, std::shared_ptr<core::logging::Logger> logger); - private: - void startAccept(); - void handleAccept(const std::shared_ptr<TcpSession>& session, const std::error_code& error); - - asio::ip::tcp::acceptor acceptor_; + protected: + std::shared_ptr<TcpSession> createSession() override; }; } // namespace org::apache::nifi::minifi::utils::net diff --git a/libminifi/src/utils/net/Ssl.cpp b/libminifi/src/utils/net/Ssl.cpp new file mode 100644 index 000000000..9506423bc --- /dev/null +++ b/libminifi/src/utils/net/Ssl.cpp @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "utils/net/Ssl.h" +#include "controllers/SSLContextService.h" + +namespace org::apache::nifi::minifi::utils::net { + +std::optional<utils::net::SslData> getSslData(const core::ProcessContext& context, const core::Property& ssl_prop, const std::shared_ptr<core::logging::Logger>& logger) { + auto getSslContextService = [&]() -> std::shared_ptr<minifi::controllers::SSLContextService> { + if (auto ssl_service_name = context.getProperty(ssl_prop); ssl_service_name && !ssl_service_name->empty()) { + if (auto service = context.getControllerService(*ssl_service_name)) { + if (auto ssl_service = std::dynamic_pointer_cast<org::apache::nifi::minifi::controllers::SSLContextService>(service)) { + return ssl_service; + } else { + logger->log_warn("SSL Context Service property is set to '%s', but it is not a valid SSLContextService.", *ssl_service_name); + } + } else { + logger->log_warn("SSL Context Service property is set to '%s', but the controller service could not be found.", *ssl_service_name); + } + } else { + logger->log_warn("No valid SSL Context Service property is set."); + } + return nullptr; + }; + + if (auto ssl_service = getSslContextService()) { + utils::net::SslData ssl_data; + ssl_data.ca_loc = ssl_service->getCACertificate(); + ssl_data.cert_loc = ssl_service->getCertificateFile(); + ssl_data.key_loc = ssl_service->getPrivateKeyFile(); + ssl_data.key_pw = ssl_service->getPassphrase(); + return ssl_data; + } + + return std::nullopt; +} + +} // namespace org::apache::nifi::minifi::utils::net diff --git a/libminifi/src/utils/net/SslServer.cpp b/libminifi/src/utils/net/SslServer.cpp new file mode 100644 index 000000000..2d1e14807 --- /dev/null +++ b/libminifi/src/utils/net/SslServer.cpp @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "utils/net/SslServer.h" + +namespace org::apache::nifi::minifi::utils::net { + +SslSession::SslSession(asio::io_context& io_context, asio::ssl::context& context, utils::ConcurrentQueue<Message>& concurrent_queue, + std::optional<size_t> max_queue_size, std::shared_ptr<core::logging::Logger> logger) + : concurrent_queue_(concurrent_queue), + max_queue_size_(max_queue_size), + logger_(std::move(logger)), + socket_(io_context, context) { +} + +ssl_socket::lowest_layer_type& SslSession::getSocket() { + return socket_.lowest_layer(); +} + +void SslSession::start() { + socket_.async_handshake(asio::ssl::stream_base::server, + [this, self = shared_from_this()](const std::error_code& error_code) { + if (error_code) { + logger_->log_error("Error occured during SSL handshake: (%d) %s", error_code.value(), error_code.message()); + return; + } + asio::async_read_until(socket_, + buffer_, + '\n', + [self](const auto& error_code, size_t) -> void { + self->handleReadUntilNewLine(error_code); + }); + }); +} + +void SslSession::handleReadUntilNewLine(std::error_code error_code) { + if (error_code) + return; + std::istream is(&buffer_); + std::string message; + std::getline(is, message); + if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size()) + concurrent_queue_.enqueue(Message(message, IpProtocol::TCP, getSocket().remote_endpoint().address(), getSocket().local_endpoint().port())); + else + logger_->log_warn("Queue is full. TCP message ignored."); + asio::async_read_until(socket_, + buffer_, + '\n', + [self = shared_from_this()](const auto& error_code, size_t) -> void { + self->handleReadUntilNewLine(error_code); + }); +} + +SslServer::SslServer(std::optional<size_t> max_queue_size, uint16_t port, std::shared_ptr<core::logging::Logger> logger, SslData ssl_data, ClientAuthOption client_auth) + : SessionHandlingServer<SslSession>(max_queue_size, port, std::move(logger)), + context_(asio::ssl::context::sslv23), + ssl_data_(std::move(ssl_data)) { + context_.set_options( + asio::ssl::context::default_workarounds + | asio::ssl::context::no_sslv2 + | asio::ssl::context::single_dh_use); + context_.set_password_callback([this](std::size_t&, asio::ssl::context_base::password_purpose&) { return ssl_data_.key_pw; }); + context_.use_certificate_file(ssl_data_.cert_loc, asio::ssl::context::pem); + context_.use_private_key_file(ssl_data_.key_loc, asio::ssl::context::pem); + context_.load_verify_file(ssl_data_.ca_loc); + if (client_auth == ClientAuthOption::REQUIRED) { + context_.set_verify_mode(asio::ssl::verify_peer|asio::ssl::verify_fail_if_no_peer_cert); + } else if (client_auth == ClientAuthOption::WANT) { + context_.set_verify_mode(asio::ssl::verify_peer); + } + startAccept(); +} + +std::shared_ptr<SslSession> SslServer::createSession() { + return std::make_shared<SslSession>(io_context_, context_, concurrent_queue_, max_queue_size_, logger_); +} + +} // namespace org::apache::nifi::minifi::utils::net diff --git a/libminifi/src/utils/net/TcpServer.cpp b/libminifi/src/utils/net/TcpServer.cpp index 6f0bb13de..e3a53db55 100644 --- a/libminifi/src/utils/net/TcpServer.cpp +++ b/libminifi/src/utils/net/TcpServer.cpp @@ -57,29 +57,12 @@ void TcpSession::handleReadUntilNewLine(std::error_code error_code) { } TcpServer::TcpServer(std::optional<size_t> max_queue_size, uint16_t port, std::shared_ptr<core::logging::Logger> logger) - : Server(max_queue_size, std::move(logger)), - acceptor_(io_context_, asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port)) { + : SessionHandlingServer<TcpSession>(max_queue_size, port, std::move(logger)) { startAccept(); } -void TcpServer::startAccept() { - auto new_session = std::make_shared<TcpSession>(io_context_, concurrent_queue_, max_queue_size_, logger_); - acceptor_.async_accept(new_session->getSocket(), - [this, new_session](const auto& error_code) -> void { - handleAccept(new_session, error_code); - }); -} - -void TcpServer::handleAccept(const std::shared_ptr<TcpSession>& session, const std::error_code& error) { - if (error) - return; - - session->start(); - auto new_session = std::make_shared<TcpSession>(io_context_, concurrent_queue_, max_queue_size_, logger_); - acceptor_.async_accept(new_session->getSocket(), - [this, new_session](const auto& error_code) -> void { - handleAccept(new_session, error_code); - }); +std::shared_ptr<TcpSession> TcpServer::createSession() { + return std::make_shared<TcpSession>(io_context_, concurrent_queue_, max_queue_size_, logger_); } } // namespace org::apache::nifi::minifi::utils::net diff --git a/libminifi/test/Utils.h b/libminifi/test/Utils.h index 09f86663e..96389f740 100644 --- a/libminifi/test/Utils.h +++ b/libminifi/test/Utils.h @@ -22,6 +22,8 @@ #include "rapidjson/document.h" #include "asio.hpp" +#include "asio/ssl.hpp" +#include "net/Ssl.h" using namespace std::chrono_literals; @@ -42,6 +44,8 @@ using namespace std::chrono_literals; return std::forward<T>(instance).method(std::forward<Args>(args)...); \ } +namespace org::apache::nifi::minifi::test::utils { + // carries out a loose match on objects, i.e. it doesn't matter if the // actual object has extra fields than expected void matchJSON(const rapidjson::Value& actual, const rapidjson::Value& expected) { @@ -107,7 +111,7 @@ bool countLogOccurrencesUntil(const std::string& pattern, return false; } -void sendMessagesViaTCP(const std::vector<std::string_view>& contents, uint64_t port) { +bool sendMessagesViaTCP(const std::vector<std::string_view>& contents, uint64_t port) { asio::io_context io_context; asio::ip::tcp::socket socket(io_context); asio::ip::tcp::endpoint remote_endpoint(asio::ip::address::from_string("127.0.0.1"), port); @@ -116,10 +120,13 @@ void sendMessagesViaTCP(const std::vector<std::string_view>& contents, uint64_t for (auto& content : contents) { std::string tcp_message(content); tcp_message += '\n'; - socket.send(asio::buffer(tcp_message, tcp_message.size()), 0, err); + asio::write(socket, asio::buffer(tcp_message, tcp_message.size()), err); + } + if (err) { + return false; } - REQUIRE(!err); socket.close(); + return true; } struct ConnectionTestAccessor { @@ -135,3 +142,38 @@ struct FlowFileQueueTestAccessor { FIELD_ACCESSOR(load_task_); FIELD_ACCESSOR(queue_); }; + +bool sendMessagesViaSSL(const std::vector<std::string_view>& contents, uint64_t port, const std::string& ca_cert_path, const std::optional<minifi::utils::net::SslData>& ssl_data = std::nullopt) { + asio::ssl::context ctx(asio::ssl::context::sslv23); + ctx.load_verify_file(ca_cert_path); + if (ssl_data) { + ctx.set_verify_mode(asio::ssl::verify_peer); + ctx.use_certificate_file(ssl_data->cert_loc, asio::ssl::context::pem); + ctx.use_private_key_file(ssl_data->key_loc, asio::ssl::context::pem); + ctx.set_password_callback([password = ssl_data->key_pw](std::size_t&, asio::ssl::context_base::password_purpose&) { return password; }); + } + asio::io_context io_context; + asio::ssl::stream<asio::ip::tcp::socket> socket(io_context, ctx); + asio::ip::tcp::endpoint remote_endpoint(asio::ip::address::from_string("127.0.0.1"), port); + asio::error_code err; + socket.lowest_layer().connect(remote_endpoint, err); + if (err) { + return false; + } + socket.handshake(asio::ssl::stream_base::client, err); + if (err) { + return false; + } + for (auto& content : contents) { + std::string tcp_message(content); + tcp_message += '\n'; + asio::write(socket, asio::buffer(tcp_message, tcp_message.size()), err); + } + if (err) { + return false; + } + socket.lowest_layer().close(); + return true; +} + +} // namespace org::apache::nifi::minifi::test::utils diff --git a/libminifi/test/rocksdb-tests/SwapTests.cpp b/libminifi/test/rocksdb-tests/SwapTests.cpp index 3739376bb..053f08333 100644 --- a/libminifi/test/rocksdb-tests/SwapTests.cpp +++ b/libminifi/test/rocksdb-tests/SwapTests.cpp @@ -27,6 +27,8 @@ #include "core/ProcessSession.h" #include "../unit/ProvenanceTestHelper.h" +namespace org::apache::nifi::minifi::test { + class OutputProcessor : public core::Processor { public: using core::Processor::Processor; @@ -86,8 +88,8 @@ TEST_CASE("Connection will on-demand swap flow files") { auto dir = testController.createTempDirectory(); auto config = std::make_shared<minifi::Configure>(); - config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, utils::file::FileUtils::concat_path(dir, "content_repository")); - config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository")); + config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, minifi::utils::file::FileUtils::concat_path(dir, "content_repository")); + config->set(minifi::Configure::nifi_flowfile_repository_directory_default, minifi::utils::file::FileUtils::concat_path(dir, "flowfile_repository")); auto prov_repo = std::make_shared<TestRepository>(); auto ff_repo = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository"); @@ -102,7 +104,7 @@ TEST_CASE("Connection will on-demand swap flow files") { auto processor = std::make_shared<OutputProcessor>("proc"); - auto connection = std::make_shared<minifi::Connection>(ff_repo, content_repo, swap_manager, "conn", utils::IdGenerator::getIdGenerator()->generate()); + auto connection = std::make_shared<minifi::Connection>(ff_repo, content_repo, swap_manager, "conn", minifi::utils::IdGenerator::getIdGenerator()->generate()); connection->setSwapThreshold(50); connection->addRelationship(OutputProcessor::Success); connection->setSourceUUID(processor->getUUID()); @@ -117,9 +119,9 @@ TEST_CASE("Connection will on-demand swap flow files") { } REQUIRE(connection->getQueueSize() == processor->flow_files_.size()); - utils::FlowFileQueue& queue = ConnectionTestAccessor::get_queue_(*connection); + minifi::utils::FlowFileQueue& queue = utils::ConnectionTestAccessor::get_queue_(*connection); // below max threshold live flow files - REQUIRE(FlowFileQueueTestAccessor::get_queue_(queue).size() <= 75); + REQUIRE(utils::FlowFileQueueTestAccessor::get_queue_(queue).size() <= 75); REQUIRE(queue.size() == 200); std::set<std::shared_ptr<core::FlowFile>> expired; @@ -135,3 +137,5 @@ TEST_CASE("Connection will on-demand swap flow files") { REQUIRE(queue.empty()); } + +} // namespace org::apache::nifi::minifi::test diff --git a/libminifi/test/sql-tests/ExecuteSQLTests.cpp b/libminifi/test/sql-tests/ExecuteSQLTests.cpp index 32e6eeb4b..618a20dd9 100644 --- a/libminifi/test/sql-tests/ExecuteSQLTests.cpp +++ b/libminifi/test/sql-tests/ExecuteSQLTests.cpp @@ -25,6 +25,8 @@ #include "Utils.h" #include "FlowFileMatcher.h" +namespace org::apache::nifi::minifi::test { + TEST_CASE("ExecuteSQL works without incoming flow file", "[ExecuteSQL1]") { SQLTestController controller; @@ -43,7 +45,7 @@ TEST_CASE("ExecuteSQL works without incoming flow file", "[ExecuteSQL1]") { REQUIRE(row_count == "2"); auto content = plan->getContent(flow_files[0]); - verifyJSON(content, R"( + utils::verifyJSON(content, R"( [{ "int_col": 11, "text_col": "one" @@ -74,7 +76,7 @@ TEST_CASE("ExecuteSQL uses statement in property", "[ExecuteSQL2]") { REQUIRE(row_count == "1"); auto content = plan->getContent(flow_files[0]); - verifyJSON(content, R"( + utils::verifyJSON(content, R"( [{ "int_col": 11, "text_col": "one" @@ -100,7 +102,7 @@ TEST_CASE("ExecuteSQL uses statement in content", "[ExecuteSQL3]") { REQUIRE(row_count == "2"); auto content = plan->getContent(flow_files[0]); - verifyJSON(content, R"( + utils::verifyJSON(content, R"( [{ "int_col": 11, "text_col": "one" @@ -132,7 +134,7 @@ TEST_CASE("ExecuteSQL uses sql.args.N.value attributes", "[ExecuteSQL4]") { REQUIRE(row_count == "1"); auto content = plan->getContent(flow_files[0]); - verifyJSON(content, R"( + utils::verifyJSON(content, R"( [{ "int_col": 11, "text_col": "banana" @@ -161,7 +163,7 @@ TEST_CASE("ExecuteSQL honors Max Rows Per Flow File", "[ExecuteSQL5]") { plan->run(); auto content_verifier = [&] (const std::shared_ptr<core::FlowFile>& actual, const std::string& expected) { - verifyJSON(plan->getContent(actual), expected); + utils::verifyJSON(plan->getContent(actual), expected); }; FlowFileMatcher matcher{content_verifier, { @@ -196,3 +198,5 @@ TEST_CASE("ExecuteSQL incoming flow file is malformed", "[ExecuteSQL6]") { REQUIRE_THROWS(plan->run()); } + +} // namespace org::apache::nifi::minifi::test diff --git a/libminifi/test/sql-tests/QueryDatabaseTableTests.cpp b/libminifi/test/sql-tests/QueryDatabaseTableTests.cpp index f9e4a5a2b..f51363569 100644 --- a/libminifi/test/sql-tests/QueryDatabaseTableTests.cpp +++ b/libminifi/test/sql-tests/QueryDatabaseTableTests.cpp @@ -24,6 +24,8 @@ #include "Utils.h" #include "FlowFileMatcher.h" +namespace org::apache::nifi::minifi::test { + TEST_CASE("QueryDatabaseTable queries the table and returns specified columns", "[QueryDatabaseTable1]") { SQLTestController controller; @@ -48,7 +50,7 @@ TEST_CASE("QueryDatabaseTable queries the table and returns specified columns", flow_files[0]->getAttribute(minifi::processors::QueryDatabaseTable::RESULT_ROW_COUNT, row_count); REQUIRE(row_count == "3"); auto content = plan->getContent(flow_files[0]); - verifyJSON(content, R"( + utils::verifyJSON(content, R"( [{"text_col": "one"}, {"text_col": "two"}, {"text_col": "three"}] )", true); } @@ -88,7 +90,7 @@ TEST_CASE("QueryDatabaseTable requerying the table returns only new rows", "[Que second_flow_files[0]->getAttribute(minifi::processors::QueryDatabaseTable::RESULT_ROW_COUNT, row_count); REQUIRE(row_count == "2"); auto content = plan->getContent(second_flow_files[0]); - verifyJSON(content, R"( + utils::verifyJSON(content, R"( [{"text_col": "four"}, {"text_col": "five"}] )", true); } @@ -119,7 +121,7 @@ TEST_CASE("QueryDatabaseTable specifying initial max values", "[QueryDatabaseTab flow_files[0]->getAttribute(minifi::processors::QueryDatabaseTable::RESULT_ROW_COUNT, row_count); REQUIRE(row_count == "2"); auto content = plan->getContent(flow_files[0]); - verifyJSON(content, R"( + utils::verifyJSON(content, R"( [{"text_col": "three"}, {"text_col": "four"}] )", true); } @@ -145,7 +147,7 @@ TEST_CASE("QueryDatabaseTable honors Max Rows Per Flow File and sets output attr plan->run(); auto content_verifier = [&] (const std::shared_ptr<core::FlowFile>& actual, const std::string& expected) { - verifyJSON(plan->getContent(actual), expected, true); + utils::verifyJSON(plan->getContent(actual), expected, true); }; FlowFileMatcher matcher(content_verifier, { @@ -199,7 +201,7 @@ TEST_CASE("QueryDatabaseTable changing table name resets state", "[QueryDatabase sql_proc->setProperty(minifi::processors::QueryDatabaseTable::TableName.getName(), "empty_test_table"); plan->run(true); flow_files = plan->getOutputs({"success", "d"}); - REQUIRE(flow_files.size() == 0); + REQUIRE(flow_files.empty()); // again query "test_table", by now the stored state is reset, so all rows are returned sql_proc->setProperty(minifi::processors::QueryDatabaseTable::TableName.getName(), "test_table"); @@ -250,3 +252,5 @@ TEST_CASE("QueryDatabaseTable changing maximum value columns resets state", "[Qu flow_files[0]->getAttribute(minifi::processors::QueryDatabaseTable::RESULT_ROW_COUNT, row_count); REQUIRE(row_count == "3"); } + +} // namespace org::apache::nifi::minifi::test diff --git a/libminifi/test/unit/FlowFileQueueSwapTests.cpp b/libminifi/test/unit/FlowFileQueueSwapTests.cpp index a2e52cbc4..80d0f4f74 100644 --- a/libminifi/test/unit/FlowFileQueueSwapTests.cpp +++ b/libminifi/test/unit/FlowFileQueueSwapTests.cpp @@ -22,6 +22,8 @@ #include "../TestBase.h" #include "SwapTestController.h" +namespace org::apache::nifi::minifi::test { + TEST_CASE("Setting swap threshold sets underlying queue limits", "[SwapTest1]") { const size_t target_size = 4; const size_t min_size = target_size / 2; @@ -29,9 +31,9 @@ TEST_CASE("Setting swap threshold sets underlying queue limits", "[SwapTest1]") minifi::Connection conn(nullptr, nullptr, ""); conn.setSwapThreshold(target_size); - REQUIRE(FlowFileQueueTestAccessor::get_min_size_(ConnectionTestAccessor::get_queue_(conn)) == min_size); - REQUIRE(FlowFileQueueTestAccessor::get_target_size_(ConnectionTestAccessor::get_queue_(conn)) == target_size); - REQUIRE(FlowFileQueueTestAccessor::get_max_size_(ConnectionTestAccessor::get_queue_(conn)) == max_size); + REQUIRE(utils::FlowFileQueueTestAccessor::get_min_size_(utils::ConnectionTestAccessor::get_queue_(conn)) == min_size); + REQUIRE(utils::FlowFileQueueTestAccessor::get_target_size_(utils::ConnectionTestAccessor::get_queue_(conn)) == target_size); + REQUIRE(utils::FlowFileQueueTestAccessor::get_max_size_(utils::ConnectionTestAccessor::get_queue_(conn)) == max_size); } TEST_CASE_METHOD(SwapTestController, "Default constructed FlowFileQueue won't swap", "[SwapTest2]") { @@ -181,3 +183,5 @@ TEST_CASE_METHOD(SwapTestController, "Popping below min checks if the pending lo verifySwapEvents({{Load, {120}}}); verifyQueue({70, 80, 90, 100, 110}, {{}}, {}); } + +} // namespace org::apache::nifi::minifi::test diff --git a/libminifi/test/unit/SwapTestController.h b/libminifi/test/unit/SwapTestController.h index f6650d2ee..002e87d61 100644 --- a/libminifi/test/unit/SwapTestController.h +++ b/libminifi/test/unit/SwapTestController.h @@ -30,6 +30,7 @@ #include "../Catch.h" #include "../unit/ProvenanceTestHelper.h" +namespace org::apache::nifi::minifi::test { using Timepoint = std::chrono::time_point<std::chrono::steady_clock>; @@ -75,7 +76,7 @@ class SwappingFlowFileTestRepo : public TestFlowRepository, public minifi::SwapM for (const auto& ff_id : flow_files) { std::string value; Get(ff_id.id.to_string().c_str(), value); - utils::Identifier container_id; + minifi::utils::Identifier container_id; auto ff = minifi::FlowFileRecord::DeSerialize(std::as_bytes(std::span(value)), content_repo_, container_id); ff->setPenaltyExpiration(ff_id.to_be_processed_after); load_task.result.push_back(std::move(ff)); @@ -127,7 +128,7 @@ struct VerifiedQueue { void verify(std::initializer_list<unsigned> live, std::optional<std::initializer_list<unsigned>> inter, std::initializer_list<unsigned> swapped) const { // check live ffs - auto live_copy = FlowFileQueueTestAccessor::get_queue_(impl); + auto live_copy = utils::FlowFileQueueTestAccessor::get_queue_(impl); REQUIRE(live_copy.size() == live.size()); for (auto sec : live) { auto min = live_copy.popMin(); @@ -136,9 +137,9 @@ struct VerifiedQueue { // check inter ffs if (!inter) { - REQUIRE_FALSE(FlowFileQueueTestAccessor::get_load_task_(impl).has_value()); + REQUIRE_FALSE(utils::FlowFileQueueTestAccessor::get_load_task_(impl).has_value()); } else { - auto& intermediate = FlowFileQueueTestAccessor::get_load_task_(impl)->intermediate_items; + auto& intermediate = utils::FlowFileQueueTestAccessor::get_load_task_(impl)->intermediate_items; REQUIRE(intermediate.size() == inter->size()); size_t idx = 0; for (auto sec : inter.value()) { @@ -148,7 +149,7 @@ struct VerifiedQueue { } // check swapped ffs - auto swapped_copy = FlowFileQueueTestAccessor::get_swapped_flow_files_(impl); + auto swapped_copy = utils::FlowFileQueueTestAccessor::get_swapped_flow_files_(impl); REQUIRE(swapped_copy.size() == swapped.size()); for (auto sec : swapped) { auto min = swapped_copy.popMin(); @@ -169,7 +170,7 @@ struct VerifiedQueue { explicit VerifiedQueue(std::shared_ptr<minifi::SwapManager> swap_manager) : impl(std::move(swap_manager)) {} - utils::FlowFileQueue impl; + minifi::utils::FlowFileQueue impl; FlowFilePtrVec ref_; }; @@ -179,8 +180,8 @@ class SwapTestController : public TestController { content_repo_ = std::make_shared<core::repository::VolatileContentRepository>(); flow_repo_ = std::make_shared<SwappingFlowFileTestRepo>(); flow_repo_->loadComponent(content_repo_); - clock_ = std::make_shared<utils::ManualClock>(); - utils::timeutils::setClock(clock_); + clock_ = std::make_shared<minifi::utils::ManualClock>(); + minifi::utils::timeutils::setClock(clock_); queue_ = std::make_shared<VerifiedQueue>(std::static_pointer_cast<minifi::SwapManager>(flow_repo_)); } @@ -233,5 +234,7 @@ class SwapTestController : public TestController { std::shared_ptr<SwappingFlowFileTestRepo> flow_repo_; std::shared_ptr<core::repository::VolatileContentRepository> content_repo_; std::shared_ptr<VerifiedQueue> queue_; - std::shared_ptr<utils::ManualClock> clock_; + std::shared_ptr<minifi::utils::ManualClock> clock_; }; + +} // namespace org::apache::nifi::minifi::test
