szaszm commented on a change in pull request #776:
URL: https://github.com/apache/nifi-minifi-cpp/pull/776#discussion_r428310454



##########
File path: libminifi/src/c2/C2Agent.cpp
##########
@@ -75,54 +78,55 @@ C2Agent::C2Agent(const 
std::shared_ptr<core::controller::ControllerServiceProvid
 
   c2_producer_ = [&]() {
     // place priority on messages to send to the c2 server
-      if (protocol_.load() != nullptr && 
request_mutex.try_lock_for(std::chrono::seconds(1))) {
-        std::lock_guard<std::timed_mutex> lock(request_mutex, std::adopt_lock);
-        if (!requests.empty()) {
-          int count = 0;
-          do {
-            const C2Payload payload(std::move(requests.back()));
-            requests.pop_back();
-            try {
-              C2Payload && response = 
protocol_.load()->consumePayload(payload);
-              enqueue_c2_server_response(std::move(response));
-            }
-            catch(const std::exception &e) {
-              logger_->log_error("Exception occurred while consuming payload. 
error: %s", e.what());
-            }
-            catch(...) {
-              logger_->log_error("Unknonwn exception occurred while consuming 
payload.");
-            }
-          }while(!requests.empty() && ++count < max_c2_responses);
+    if (protocol_.load() != nullptr) {
+      std::vector<C2Payload> payload_batch;
+      payload_batch.reserve(max_c2_responses);
+      auto getRequestPayload = [&payload_batch] (C2Payload&& payload) { 
payload_batch.emplace_back(std::move(payload)); };
+      const std::chrono::system_clock::time_point timeout_point = 
std::chrono::system_clock::now() + std::chrono::milliseconds(1);
+      for (std::size_t attempt_num = 0; attempt_num < max_c2_responses; 
++attempt_num) {
+        if (!requests.consumeWaitUntil(getRequestPayload, timeout_point)) {

Review comment:
       The mutex inside the queue is not `timed_mutex` so we wait indefinitely 
for locking and only wait definitely for content.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to