hunyadi-dev commented on a change in pull request #1013:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1013#discussion_r580382072



##########
File path: extensions/standard-processors/tests/unit/ProcessorTests.cpp
##########
@@ -614,3 +614,109 @@ TEST_CASE("TestRPGWithoutHostInvalidPort", "[TestRPG5]") {
 TEST_CASE("TestRPGValid", "[TestRPG6]") {
   testRPGBypass("", "8080", "8080", false);
 }
+
+class TestProcessorProcessIncomingQueue : public minifi::core::Processor {
+ public:
+  explicit TestProcessorProcessIncomingQueue(std::string name, 
utils::Identifier uuid = {}) : Processor(name, uuid) {
+    setSupportedProperties({NumFlowFiles});
+  }
+
+  void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const 
std::shared_ptr<core::ProcessSessionFactory>&) override {
+    context->getProperty(NumFlowFiles.getName(), num_flow_files_);
+  }
+
+  void onTrigger(const std::shared_ptr<core::ProcessContext>&, const 
std::shared_ptr<core::ProcessSession>& session) override {
+    for (uint32_t i = 0; i < num_flow_files_; ++i) {
+      auto flow_file = session->get();
+      if (flow_file) {
+        session->remove(flow_file);
+      }
+    }
+  }
+
+  static core::Property NumFlowFiles;
+  uint32_t num_flow_files_ = 0;
+};
+
+core::Property TestProcessorProcessIncomingQueue::NumFlowFiles = 
core::PropertyBuilder::createProperty("Number of Flow Files to Process")
+    ->withDefaultValue<uint32_t>(0)
+    ->build();
+
+REGISTER_RESOURCE(TestProcessorProcessIncomingQueue, "A mock processor that 
processes a configurable number of incoming flow files");
+
+bool testAutomaticYieldWhenNoIncomingFlowFilesAreProcessed(uint32_t 
num_incoming_, uint32_t num_processed_) {
+  LogTestController::getInstance().setDebug<core::Processor>();
+  LogTestController::getInstance().setDebug<processors::GenerateFlowFile>();
+
+  const auto repo = std::make_shared<TestRepository>();
+  const auto content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
+  content_repo->initialize(std::make_shared<minifi::Configure>());
+
+  const std::shared_ptr<core::Processor> generate_flow_file = 
std::make_shared<processors::GenerateFlowFile>("generate_flow_file");
+  const auto generate_flow_file_node = 
std::make_shared<core::ProcessorNode>(generate_flow_file);
+  generate_flow_file->initialize();
+  const auto gff_context = 
std::make_shared<core::ProcessContext>(generate_flow_file_node, nullptr, repo, 
repo, content_repo);
+  gff_context->setProperty(processors::GenerateFlowFile::BatchSize, 
std::to_string(num_incoming_));
+
+  const std::shared_ptr<core::Processor> process_incoming_queue = 
std::make_shared<TestProcessorProcessIncomingQueue>("process_incoming_queue");
+  const auto process_incoming_queue_node = 
std::make_shared<core::ProcessorNode>(process_incoming_queue);
+  process_incoming_queue->initialize();
+  const auto piq_context = 
std::make_shared<core::ProcessContext>(process_incoming_queue_node, nullptr, 
repo, repo, content_repo);
+  piq_context->setProperty(TestProcessorProcessIncomingQueue::NumFlowFiles, 
std::to_string(num_processed_));
+
+  const auto connection = std::make_shared<minifi::Connection>(repo, 
content_repo, "ggf_to_piq");
+  connection->addRelationship(core::Relationship{"success", ""});
+  connection->setSourceUUID(generate_flow_file->getUUID());
+  connection->setDestinationUUID(process_incoming_queue->getUUID());
+  generate_flow_file->addConnection(connection);
+  process_incoming_queue->addConnection(connection);
+
+  const auto gff_session_factory = 
std::make_shared<core::ProcessSessionFactory>(gff_context);
+  generate_flow_file->setScheduledState(core::ScheduledState::RUNNING);
+  generate_flow_file->onSchedule(gff_context, gff_session_factory);
+  generate_flow_file->onTrigger(gff_context, gff_session_factory);
+
+  const auto piq_session_factory = 
std::make_shared<core::ProcessSessionFactory>(piq_context);
+  process_incoming_queue->setScheduledState(core::ScheduledState::RUNNING);
+  process_incoming_queue->onSchedule(piq_context, piq_session_factory);
+  process_incoming_queue->onTrigger(piq_context, piq_session_factory);
+
+  return process_incoming_queue->isYield();
+}
+
+TEST_CASE("If there are no incoming flow files, then there is no automatic 
yield", "[AutomaticYield]") {
+  SECTION("0 flow files in the queue, we don't try to process any") {
+    auto is_yield = testAutomaticYieldWhenNoIncomingFlowFilesAreProcessed(0, 
0);
+    REQUIRE_FALSE(is_yield);
+  }
+  SECTION("0 flow files in the queue, we try to process one") {

Review comment:
       This description is a bit misleading as we don't call 
`Processession::remove` if there is nothing to process. It might be also worth 
updating this function not to take `nullptr` as its arg.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to