adamdebreceni commented on code in PR #1592:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1592#discussion_r1236720636
##########
extensions/standard-processors/tests/unit/GetTCPTests.cpp:
##########
@@ -15,391 +14,286 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#include <utility>
-#include <memory>
#include <string>
-#include <vector>
-#include <set>
-#include "unit/ProvenanceTestHelper.h"
-#include "TestBase.h"
-#include "Catch.h"
-#include "RandomServerSocket.h"
-#include "Scheduling.h"
-#include "LogAttribute.h"
-#include "GetTCP.h"
-#include "core/Core.h"
-#include "core/FlowFile.h"
-#include "core/Processor.h"
-#include "core/ProcessContext.h"
-#include "core/ProcessSession.h"
-#include "core/ProcessorNode.h"
-#include "core/reporting/SiteToSiteProvenanceReportingTask.h"
-
-TEST_CASE("GetTCPWithoutEOM", "[GetTCP1]") {
- TestController testController;
- std::vector<uint8_t> buffer;
- for (auto c : "Hello World\nHello Warld\nGoodByte Cruel world") {
- buffer.push_back(c);
- }
- std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::VolatileContentRepository>();
-
- content_repo->initialize(std::make_shared<minifi::Configure>());
-
- std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory
= minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>());
- org::apache::nifi::minifi::io::RandomServerSocket
server(org::apache::nifi::minifi::io::Socket::getMyHostName());
-
-
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
- LogTestController::getInstance().setDebug<minifi::processors::GetTCP>();
- LogTestController::getInstance().setTrace<minifi::io::Socket>();
-
- std::shared_ptr<core::Repository> repo = std::make_shared<TestRepository>();
-
- auto processor =
std::make_unique<org::apache::nifi::minifi::processors::GetTCP>("gettcpexample");
-
- auto logAttribute =
std::make_unique<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-
- processor->setStreamFactory(stream_factory);
- processor->initialize();
-
- utils::Identifier processoruuid = processor->getUUID();
- REQUIRE(processoruuid);
-
- utils::Identifier logattribute_uuid = logAttribute->getUUID();
- REQUIRE(logattribute_uuid);
-
- REQUIRE(processoruuid.to_string() != logattribute_uuid.to_string());
-
- auto connection = std::make_unique<minifi::Connection>(repo, content_repo,
"gettcpexampleConnection");
- connection->addRelationship(core::Relationship("success", "description"));
-
- auto connection2 = std::make_unique<minifi::Connection>(repo, content_repo,
"logattribute");
- connection2->addRelationship(core::Relationship("success", "description"));
-
- // link the connections so that we can test results at the end for this
- connection->setSource(processor.get());
-
- // link the connections so that we can test results at the end for this
- connection->setDestination(logAttribute.get());
- connection2->setSource(logAttribute.get());
-
- connection2->setSourceUUID(logattribute_uuid);
- connection->setSourceUUID(processoruuid);
- connection->setDestinationUUID(logattribute_uuid);
-
- processor->addConnection(connection.get());
- logAttribute->addConnection(connection.get());
- logAttribute->addConnection(connection2.get());
-
- auto node = std::make_shared<core::ProcessorNode>(processor.get());
- auto node2 = std::make_shared<core::ProcessorNode>(logAttribute.get());
- auto context = std::make_shared<core::ProcessContext>(node, nullptr, repo,
repo, content_repo);
- auto context2 = std::make_shared<core::ProcessContext>(node2, nullptr, repo,
repo, content_repo);
-
context->setProperty(org::apache::nifi::minifi::processors::GetTCP::EndpointList,
org::apache::nifi::minifi::io::Socket::getMyHostName() + ":" +
std::to_string(server.getPort()));
-
context->setProperty(org::apache::nifi::minifi::processors::GetTCP::ReconnectInterval,
"200 msec");
-
context->setProperty(org::apache::nifi::minifi::processors::GetTCP::ConnectionAttemptLimit,
"10");
- auto session = std::make_shared<core::ProcessSession>(context);
- auto session2 = std::make_shared<core::ProcessSession>(context2);
-
- REQUIRE(processor->getName() == "gettcpexample");
-
- std::shared_ptr<core::FlowFile> record;
- processor->setScheduledState(core::ScheduledState::RUNNING);
-
- std::shared_ptr<core::ProcessSessionFactory> factory =
std::make_shared<core::ProcessSessionFactory>(context);
- processor->onSchedule(context, factory);
- processor->onTrigger(context, session);
- server.write(buffer, buffer.size());
- std::this_thread::sleep_for(std::chrono::seconds(2));
-
- logAttribute->initialize();
- logAttribute->incrementActiveTasks();
- logAttribute->setScheduledState(core::ScheduledState::RUNNING);
- std::shared_ptr<core::ProcessSessionFactory> factory2 =
std::make_shared<core::ProcessSessionFactory>(context2);
- logAttribute->onSchedule(context2, factory2);
- logAttribute->onTrigger(context2, session2);
+#include "Catch.h"
+#include "processors/GetTCP.h"
+#include "SingleProcessorTestController.h"
+#include "Utils.h"
+#include "utils/net/AsioCoro.h"
+#include "utils/net/AsioSocketUtils.h"
+#include "controllers/SSLContextService.h"
+#include "range/v3/algorithm/contains.hpp"
+#include "utils/gsl.h"
- auto reporter = session->getProvenanceReporter();
- auto records = reporter->getEvents();
- record = session->get();
- REQUIRE(record == nullptr);
- REQUIRE(records.empty());
+using GetTCP = org::apache::nifi::minifi::processors::GetTCP;
- processor->incrementActiveTasks();
- processor->setScheduledState(core::ScheduledState::RUNNING);
- processor->onTrigger(context, session);
- reporter = session->getProvenanceReporter();
+using namespace std::literals::chrono_literals;
- session->commit();
+namespace org::apache::nifi::minifi::test {
- logAttribute->incrementActiveTasks();
- logAttribute->setScheduledState(core::ScheduledState::RUNNING);
- logAttribute->onTrigger(context2, session2);
+void check_for_attributes(core::FlowFile& flow_file, uint16_t port) {
+ CHECK(std::to_string(port) == flow_file.getAttribute("tcp.port"));
+ const auto local_addresses = {"127.0.0.1", "::ffff:127.0.0.1", "::1"};
+ CHECK(ranges::contains(local_addresses,
flow_file.getAttribute("tcp.sender")));
+}
- REQUIRE(true == LogTestController::getInstance().contains("Reconnect
interval is 200 ms"));
- REQUIRE(true == LogTestController::getInstance().contains("Size:45
Offset:0"));
+minifi::utils::net::SslData createSslDataForServer() {
+ const std::filesystem::path executable_dir =
minifi::utils::file::FileUtils::get_executable_dir();
+ minifi::utils::net::SslData ssl_data;
+ ssl_data.ca_loc = (executable_dir / "resources" / "ca_A.crt").string();
+ ssl_data.cert_loc = (executable_dir / "resources" /
"localhost_by_A.pem").string();
+ ssl_data.key_loc = (executable_dir / "resources" /
"localhost_by_A.pem").string();
+ return ssl_data;
+}
- LogTestController::getInstance().reset();
+void addSslContextServiceTo(SingleProcessorTestController& controller) {
+ auto ssl_context_service =
controller.plan->addController("SSLContextService", "SSLContextService");
+ LogTestController::getInstance().setTrace<GetTCP>();
+ const auto executable_dir =
minifi::utils::file::FileUtils::get_executable_dir();
+ REQUIRE(controller.plan->setProperty(ssl_context_service,
controllers::SSLContextService::CACertificate.getName(), (executable_dir /
"resources" / "ca_A.crt").string()));
+ REQUIRE(controller.plan->setProperty(ssl_context_service,
controllers::SSLContextService::ClientCertificate.getName(), (executable_dir /
"resources" / "alice_by_A.pem").string()));
+ REQUIRE(controller.plan->setProperty(ssl_context_service,
controllers::SSLContextService::PrivateKey.getName(), (executable_dir /
"resources" / "alice_by_A.pem").string()));
+ ssl_context_service->enable();
}
-TEST_CASE("GetTCPWithOEM", "[GetTCP2]") {
- std::vector<uint8_t> buffer;
- for (auto c : "Hello World\nHello Warld\nGoodByte Cruel world") {
- buffer.push_back(c);
+class TcpTestServer {
+ public:
+ void run() {
+ server_thread_ = std::thread([&]() {
+ asio::co_spawn(io_context_, listenAndSendMessages(), asio::detached);
+ io_context_.run();
+ });
}
- std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::VolatileContentRepository>();
-
- content_repo->initialize(std::make_shared<minifi::Configure>());
-
- std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory
= minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>());
-
- TestController testController;
-
- org::apache::nifi::minifi::io::RandomServerSocket
server(org::apache::nifi::minifi::io::Socket::getMyHostName());
-
-
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
-
LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository
>();
- LogTestController::getInstance().setTrace<minifi::processors::GetTCP>();
- LogTestController::getInstance().setTrace<core::ConfigurableComponent>();
- LogTestController::getInstance().setTrace<minifi::io::Socket>();
-
- std::shared_ptr<core::Repository> repo = std::make_shared<TestRepository>();
-
- std::shared_ptr<core::Processor> processor =
std::make_shared<org::apache::nifi::minifi::processors::GetTCP>("gettcpexample");
- std::shared_ptr<core::Processor> logAttribute =
std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-
- processor->setStreamFactory(stream_factory);
- processor->initialize();
-
- utils::Identifier processoruuid = processor->getUUID();
- REQUIRE(processoruuid);
-
- utils::Identifier logattribute_uuid = logAttribute->getUUID();
- REQUIRE(logattribute_uuid);
-
- auto connection = std::make_unique<minifi::Connection>(repo, content_repo,
"gettcpexampleConnection");
- connection->addRelationship(core::Relationship("partial", "description"));
-
- auto connection2 = std::make_unique<minifi::Connection>(repo, content_repo,
"logattribute");
- connection2->addRelationship(core::Relationship("partial", "description"));
-
- // link the connections so that we can test results at the end for this
- connection->setSource(processor.get());
-
- // link the connections so that we can test results at the end for this
- connection->setDestination(logAttribute.get());
-
- connection2->setSource(logAttribute.get());
-
- connection2->setSourceUUID(logattribute_uuid);
- connection->setSourceUUID(processoruuid);
- connection->setDestinationUUID(logattribute_uuid);
-
- processor->addConnection(connection.get());
- logAttribute->addConnection(connection.get());
- logAttribute->addConnection(connection2.get());
-
- auto node = std::make_shared<core::ProcessorNode>(processor.get());
- auto node2 = std::make_shared<core::ProcessorNode>(logAttribute.get());
- auto context = std::make_shared<core::ProcessContext>(node, nullptr, repo,
repo, content_repo);
- auto context2 = std::make_shared<core::ProcessContext>(node2, nullptr, repo,
repo, content_repo);
-
context->setProperty(org::apache::nifi::minifi::processors::GetTCP::EndpointList,
org::apache::nifi::minifi::io::Socket::getMyHostName() + ":" +
std::to_string(server.getPort()));
-
context->setProperty(org::apache::nifi::minifi::processors::GetTCP::ReconnectInterval,
"200 msec");
-
context->setProperty(org::apache::nifi::minifi::processors::GetTCP::ConnectionAttemptLimit,
"10");
- // we're using new lines above
-
context->setProperty(org::apache::nifi::minifi::processors::GetTCP::EndOfMessageByte,
"10");
- auto session = std::make_shared<core::ProcessSession>(context);
- auto session2 = std::make_shared<core::ProcessSession>(context2);
-
-
- REQUIRE(processor->getName() == "gettcpexample");
-
- std::shared_ptr<core::FlowFile> record;
- processor->setScheduledState(core::ScheduledState::RUNNING);
-
- std::shared_ptr<core::ProcessSessionFactory> factory =
std::make_shared<core::ProcessSessionFactory>(context);
- processor->onSchedule(context, factory);
- processor->onTrigger(context, session);
- server.write(buffer, buffer.size());
- std::this_thread::sleep_for(std::chrono::seconds(2));
+ void queueMessage(std::string message) {
+ messages_to_send_.enqueue(std::move(message));
+ }
- logAttribute->initialize();
- logAttribute->incrementActiveTasks();
- logAttribute->setScheduledState(core::ScheduledState::RUNNING);
- std::shared_ptr<core::ProcessSessionFactory> factory2 =
std::make_shared<core::ProcessSessionFactory>(context2);
- logAttribute->onSchedule(context2, factory2);
- logAttribute->onTrigger(context2, session2);
+ void enableSSL() {
+ const std::filesystem::path executable_dir =
minifi::utils::file::FileUtils::get_executable_dir();
- auto reporter = session->getProvenanceReporter();
- auto records = reporter->getEvents();
- record = session->get();
- REQUIRE(record == nullptr);
- REQUIRE(records.empty());
+ asio::ssl::context ssl_context(asio::ssl::context::tls_server);
+ ssl_context.set_options(asio::ssl::context::default_workarounds |
asio::ssl::context::single_dh_use | asio::ssl::context::no_tlsv1 |
asio::ssl::context::no_tlsv1_1);
+ ssl_context.set_password_callback([key_pw = "Password12"](std::size_t&,
asio::ssl::context_base::password_purpose&) { return key_pw; });
+ ssl_context.use_certificate_file((executable_dir / "resources" /
"localhost_by_A.pem").string(), asio::ssl::context::pem);
+ ssl_context.use_private_key_file((executable_dir / "resources" /
"localhost_by_A.pem").string(), asio::ssl::context::pem);
+ ssl_context.load_verify_file((executable_dir / "resources" /
"ca_A.crt").string());
+ ssl_context.set_verify_mode(asio::ssl::verify_peer);
- processor->incrementActiveTasks();
- processor->setScheduledState(core::ScheduledState::RUNNING);
- processor->onTrigger(context, session);
- reporter = session->getProvenanceReporter();
+ ssl_context_ = std::move(ssl_context);
+ }
- session->commit();
+ uint16_t getPort() const {
+ return port_;
+ }
- logAttribute->incrementActiveTasks();
- logAttribute->setScheduledState(core::ScheduledState::RUNNING);
- logAttribute->onTrigger(context2, session2);
+ ~TcpTestServer() {
+ io_context_.stop();
+ if (server_thread_.joinable())
+ server_thread_.join();
+ }
- logAttribute->incrementActiveTasks();
- logAttribute->setScheduledState(core::ScheduledState::RUNNING);
- logAttribute->onTrigger(context2, session2);
+ private:
+ asio::awaitable<void> sendMessages(auto& socket) {
+ while (true) {
+ std::string message_to_send;
+ if (!messages_to_send_.tryDequeue(message_to_send)) {
+ co_await minifi::utils::net::async_wait(10ms);
+ continue;
+ }
+ co_await asio::async_write(socket, asio::buffer(message_to_send),
minifi::utils::net::use_nothrow_awaitable);
Review Comment:
how is this loop terminated?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]