adamdebreceni commented on a change in pull request #1027:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1027#discussion_r592980496
##########
File path: extensions/standard-processors/tests/unit/ProcessorTests.cpp
##########
@@ -615,65 +615,131 @@ TEST_CASE("TestRPGValid", "[TestRPG6]") {
testRPGBypass("", "8080", "8080", false);
}
-TEST_CASE("A Processor detects correctly if it has incoming flow files it can
process", "[isWorkAvailable]") {
+namespace {
+
+class ProcessorWithIncomingConnectionTest {
+ public:
+ ProcessorWithIncomingConnectionTest();
+ ~ProcessorWithIncomingConnectionTest();
+
+ protected:
+ std::shared_ptr<core::Processor> processor_;
+ std::shared_ptr<minifi::Connection> incoming_connection_;
+ std::shared_ptr<core::ProcessSession> session_;
+};
+
+ProcessorWithIncomingConnectionTest::ProcessorWithIncomingConnectionTest() {
LogTestController::getInstance().setDebug<core::Processor>();
const auto repo = std::make_shared<TestRepository>();
const auto content_repo =
std::make_shared<core::repository::VolatileContentRepository>();
content_repo->initialize(std::make_shared<minifi::Configure>());
- const std::shared_ptr<core::Processor> processor =
std::make_shared<processors::LogAttribute>("test_processor");
- const auto incoming_connection = std::make_shared<minifi::Connection>(repo,
content_repo, "incoming_connection");
- incoming_connection->addRelationship(core::Relationship{"success", ""});
- incoming_connection->setDestinationUUID(processor->getUUID());
- processor->addConnection(incoming_connection);
- processor->initialize();
+ processor_ = std::make_shared<processors::LogAttribute>("test_processor");
+ incoming_connection_ = std::make_shared<minifi::Connection>(repo,
content_repo, "incoming_connection");
+ incoming_connection_->addRelationship(core::Relationship{"success", ""});
+ incoming_connection_->setDestinationUUID(processor_->getUUID());
+ processor_->addConnection(incoming_connection_);
+ processor_->initialize();
- const auto processor_node = std::make_shared<core::ProcessorNode>(processor);
+ const auto processor_node =
std::make_shared<core::ProcessorNode>(processor_);
const auto context = std::make_shared<core::ProcessContext>(processor_node,
nullptr, repo, repo, content_repo);
const auto session_factory =
std::make_shared<core::ProcessSessionFactory>(context);
- const auto session = session_factory->createSession();
+ session_ = session_factory->createSession();
+}
+ProcessorWithIncomingConnectionTest::~ProcessorWithIncomingConnectionTest() {
+ LogTestController::getInstance().reset();
+}
+
+} // namespace
+
+TEST_CASE_METHOD(ProcessorWithIncomingConnectionTest, "A Processor detects
correctly if it has incoming flow files it can process", "[isWorkAvailable]") {
SECTION("Initially, the queue is empty, so there is no work available") {
- REQUIRE_FALSE(processor->isWorkAvailable());
+ REQUIRE_FALSE(processor_->isWorkAvailable());
}
SECTION("When a non-penalized flow file is queued, there is work available")
{
- const auto flow_file = session->create();
- incoming_connection->put(flow_file);
+ const auto flow_file = session_->create();
+ incoming_connection_->put(flow_file);
- REQUIRE(processor->isWorkAvailable());
+ REQUIRE(processor_->isWorkAvailable());
}
SECTION("When a penalized flow file is queued, there is no work available
(until the penalty expires)") {
- const auto flow_file = session->create();
- session->penalize(flow_file);
- incoming_connection->put(flow_file);
+ const auto flow_file = session_->create();
+ session_->penalize(flow_file);
+ incoming_connection_->put(flow_file);
- REQUIRE_FALSE(processor->isWorkAvailable());
+ REQUIRE_FALSE(processor_->isWorkAvailable());
}
SECTION("If there is both a penalized and a non-penalized flow file queued,
there is work available") {
- const auto normal_flow_file = session->create();
- incoming_connection->put(normal_flow_file);
+ const auto normal_flow_file = session_->create();
+ incoming_connection_->put(normal_flow_file);
- const auto penalized_flow_file = session->create();
- session->penalize(penalized_flow_file);
- incoming_connection->put(penalized_flow_file);
+ const auto penalized_flow_file = session_->create();
+ session_->penalize(penalized_flow_file);
+ incoming_connection_->put(penalized_flow_file);
- REQUIRE(processor->isWorkAvailable());
+ REQUIRE(processor_->isWorkAvailable());
}
SECTION("When a penalized flow file is queued, there is work available after
the penalty expires") {
- processor->setPenalizationPeriodMsec(10);
+ processor_->setPenalizationPeriod(std::chrono::milliseconds{10});
- const auto flow_file = session->create();
- session->penalize(flow_file);
- incoming_connection->put(flow_file);
+ const auto flow_file = session_->create();
+ session_->penalize(flow_file);
+ incoming_connection_->put(flow_file);
- REQUIRE_FALSE(processor->isWorkAvailable());
+ REQUIRE_FALSE(processor_->isWorkAvailable());
const auto penalty_has_expired = [flow_file] { return
!flow_file->isPenalized(); };
REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1},
penalty_has_expired, std::chrono::milliseconds{10}));
- REQUIRE(processor->isWorkAvailable());
+ REQUIRE(processor_->isWorkAvailable());
}
}
+
+TEST_CASE_METHOD(ProcessorWithIncomingConnectionTest, "A failed and
re-penalized flow file does not block the incoming queue of the Processor",
"[penalize]") {
+ processor_->setPenalizationPeriod(std::chrono::milliseconds{100});
+ const auto penalized_flow_file = session_->create();
+ session_->penalize(penalized_flow_file); // first penalty duration is 100 ms
+ incoming_connection_->put(penalized_flow_file);
+ const auto penalty_has_expired = [penalized_flow_file] { return
!penalized_flow_file->isPenalized(); };
+ REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1},
penalty_has_expired, std::chrono::milliseconds{10}));
+
+ const auto flow_file_1 = session_->create();
+ incoming_connection_->put(flow_file_1);
+ const auto flow_file_2 = session_->create();
+ incoming_connection_->put(flow_file_2);
+ const auto flow_file_3 = session_->create();
+ incoming_connection_->put(flow_file_3);
+
+ REQUIRE(incoming_connection_->isWorkAvailable());
+ std::set<std::shared_ptr<core::FlowFile>> expired_flow_files;
+ const auto next_flow_file_1 = incoming_connection_->poll(expired_flow_files);
+ REQUIRE(next_flow_file_1 == penalized_flow_file);
+
+ session_->penalize(penalized_flow_file); // second penalty duration is 200
ms
Review comment:
does this comment apply?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]