szaszm commented on a change in pull request #776:
URL: https://github.com/apache/nifi-minifi-cpp/pull/776#discussion_r427436157



##########
File path: libminifi/include/c2/C2Agent.h
##########
@@ -41,8 +41,15 @@ namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
+
+namespace utils {
+template <typename T>
+class ConditionConcurrentQueue;
+}

Review comment:
       Why don't we include the concurrent queue header? `C2Agent` needs the 
definition, not just the declaration. It compiles probably because we 
accidentally include it anyway (ThreadPool?), but we should include it 
explicitly if we use it.
   
   https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#Rs-implicit

##########
File path: libminifi/src/c2/C2Agent.cpp
##########
@@ -75,54 +78,55 @@ C2Agent::C2Agent(const 
std::shared_ptr<core::controller::ControllerServiceProvid
 
   c2_producer_ = [&]() {
     // place priority on messages to send to the c2 server
-      if (protocol_.load() != nullptr && 
request_mutex.try_lock_for(std::chrono::seconds(1))) {
-        std::lock_guard<std::timed_mutex> lock(request_mutex, std::adopt_lock);
-        if (!requests.empty()) {
-          int count = 0;
-          do {
-            const C2Payload payload(std::move(requests.back()));
-            requests.pop_back();
-            try {
-              C2Payload && response = 
protocol_.load()->consumePayload(payload);
-              enqueue_c2_server_response(std::move(response));
-            }
-            catch(const std::exception &e) {
-              logger_->log_error("Exception occurred while consuming payload. 
error: %s", e.what());
-            }
-            catch(...) {
-              logger_->log_error("Unknonwn exception occurred while consuming 
payload.");
-            }
-          }while(!requests.empty() && ++count < max_c2_responses);
+    if (protocol_.load() != nullptr) {
+      std::vector<C2Payload> payload_batch;
+      payload_batch.reserve(max_c2_responses);
+      auto getRequestPayload = [&payload_batch] (C2Payload&& payload) { 
payload_batch.emplace_back(std::move(payload)); };
+      const std::chrono::system_clock::time_point timeout_point = 
std::chrono::system_clock::now() + std::chrono::milliseconds(1);
+      for (std::size_t attempt_num = 0; attempt_num < max_c2_responses; 
++attempt_num) {
+        if (!requests.consumeWaitUntil(getRequestPayload, timeout_point)) {

Review comment:
       Previously we took the immediately available payloads only. Now we wait 
1ms. Why is this change?
   
   If we reverted to the old way, it would also no longer be a problem to 
reunify the first two loops and eliminate the need for the temporary buffer 
(`payload_batch`).

##########
File path: libminifi/src/c2/C2Agent.cpp
##########
@@ -75,54 +78,55 @@ C2Agent::C2Agent(const 
std::shared_ptr<core::controller::ControllerServiceProvid
 
   c2_producer_ = [&]() {
     // place priority on messages to send to the c2 server
-      if (protocol_.load() != nullptr && 
request_mutex.try_lock_for(std::chrono::seconds(1))) {
-        std::lock_guard<std::timed_mutex> lock(request_mutex, std::adopt_lock);
-        if (!requests.empty()) {
-          int count = 0;
-          do {
-            const C2Payload payload(std::move(requests.back()));
-            requests.pop_back();
-            try {
-              C2Payload && response = 
protocol_.load()->consumePayload(payload);
-              enqueue_c2_server_response(std::move(response));
-            }
-            catch(const std::exception &e) {
-              logger_->log_error("Exception occurred while consuming payload. 
error: %s", e.what());
-            }
-            catch(...) {
-              logger_->log_error("Unknonwn exception occurred while consuming 
payload.");
-            }
-          }while(!requests.empty() && ++count < max_c2_responses);
+    if (protocol_.load() != nullptr) {
+      std::vector<C2Payload> payload_batch;
+      payload_batch.reserve(max_c2_responses);
+      auto getRequestPayload = [&payload_batch] (C2Payload&& payload) { 
payload_batch.emplace_back(std::move(payload)); };
+      const std::chrono::system_clock::time_point timeout_point = 
std::chrono::system_clock::now() + std::chrono::milliseconds(1);
+      for (std::size_t attempt_num = 0; attempt_num < max_c2_responses; 
++attempt_num) {
+        if (!requests.consumeWaitUntil(getRequestPayload, timeout_point)) {
+          break;
         }
       }
-      try {
-        performHeartBeat();
-      }
-      catch(const std::exception &e) {
-        logger_->log_error("Exception occurred while performing heartbeat. 
error: %s", e.what());
-      }
-      catch(...) {
-        logger_->log_error("Unknonwn exception occurred while performing 
heartbeat.");
-      }
+      std::for_each(
+        std::make_move_iterator(payload_batch.begin()),
+        std::make_move_iterator(payload_batch.end()),
+        [&] (C2Payload&& payload) {
+          try {
+            C2Payload && response = 
protocol_.load()->consumePayload(std::move(payload));
+            enqueue_c2_server_response(std::move(response));
+          }
+          catch(const std::exception &e) {
+            logger_->log_error("Exception occurred while consuming payload. 
error: %s", e.what());
+          }
+          catch(...) {
+            logger_->log_error("Unknonwn exception occurred while consuming 
payload.");
+          }
+        });
 
-      checkTriggers();
+        try {
+          performHeartBeat();
+        }
+        catch (const std::exception &e) {
+          logger_->log_error("Exception occurred while performing heartbeat. 
error: %s", e.what());
+        }
+        catch (...) {
+          logger_->log_error("Unknonwn exception occurred while performing 
heartbeat.");
+        }
+    }
+
+    checkTriggers();
+
+    return 
utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(heart_beat_period_));
+  };
 
-      return 
utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(heart_beat_period_));
-    };
   functions_.push_back(c2_producer_);
 
-  c2_consumer_ = [&]() {
-    if ( queue_mutex.try_lock_for(std::chrono::seconds(1)) ) {
-      C2Payload payload(Operation::HEARTBEAT);
-      {
-        std::lock_guard<std::timed_mutex> lock(queue_mutex, std::adopt_lock);
-        if (responses.empty()) {
-          return 
utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(C2RESPONSE_POLL_MS));
-        }
-        payload = std::move(responses.back());
-        responses.pop_back();
+  c2_consumer_ = [&] {
+    if (responses.size()) {
+      if (!responses.consumeWaitFor([this](C2Payload&& e) { 
extractPayload(std::move(e)); }, std::chrono::seconds(1))) {

Review comment:
       The outer if should say what it means (`!responses.empty()`) and we 
should use `operator&&` (logical AND) instead of nested if statements.
   
   The code doesn't make sense. Previously we called `extractPayload` with the 
consumed object. Now we call it with that AND with an empty meaningless 
temporary. This must be a bug, please fix!
   
   Now we can also wait indefinitely for the lock, but I don't think there's 
high contention so I don't mind that.

##########
File path: libminifi/src/c2/C2Agent.cpp
##########
@@ -75,54 +78,55 @@ C2Agent::C2Agent(const 
std::shared_ptr<core::controller::ControllerServiceProvid
 
   c2_producer_ = [&]() {
     // place priority on messages to send to the c2 server
-      if (protocol_.load() != nullptr && 
request_mutex.try_lock_for(std::chrono::seconds(1))) {
-        std::lock_guard<std::timed_mutex> lock(request_mutex, std::adopt_lock);
-        if (!requests.empty()) {
-          int count = 0;
-          do {
-            const C2Payload payload(std::move(requests.back()));
-            requests.pop_back();
-            try {
-              C2Payload && response = 
protocol_.load()->consumePayload(payload);
-              enqueue_c2_server_response(std::move(response));
-            }
-            catch(const std::exception &e) {
-              logger_->log_error("Exception occurred while consuming payload. 
error: %s", e.what());
-            }
-            catch(...) {
-              logger_->log_error("Unknonwn exception occurred while consuming 
payload.");
-            }
-          }while(!requests.empty() && ++count < max_c2_responses);
+    if (protocol_.load() != nullptr) {
+      std::vector<C2Payload> payload_batch;
+      payload_batch.reserve(max_c2_responses);
+      auto getRequestPayload = [&payload_batch] (C2Payload&& payload) { 
payload_batch.emplace_back(std::move(payload)); };
+      const std::chrono::system_clock::time_point timeout_point = 
std::chrono::system_clock::now() + std::chrono::milliseconds(1);
+      for (std::size_t attempt_num = 0; attempt_num < max_c2_responses; 
++attempt_num) {
+        if (!requests.consumeWaitUntil(getRequestPayload, timeout_point)) {
+          break;
         }
       }
-      try {
-        performHeartBeat();
-      }
-      catch(const std::exception &e) {
-        logger_->log_error("Exception occurred while performing heartbeat. 
error: %s", e.what());
-      }
-      catch(...) {
-        logger_->log_error("Unknonwn exception occurred while performing 
heartbeat.");
-      }
+      std::for_each(
+        std::make_move_iterator(payload_batch.begin()),
+        std::make_move_iterator(payload_batch.end()),
+        [&] (C2Payload&& payload) {
+          try {
+            C2Payload && response = 
protocol_.load()->consumePayload(std::move(payload));
+            enqueue_c2_server_response(std::move(response));
+          }
+          catch(const std::exception &e) {
+            logger_->log_error("Exception occurred while consuming payload. 
error: %s", e.what());
+          }
+          catch(...) {
+            logger_->log_error("Unknonwn exception occurred while consuming 
payload.");
+          }
+        });
 
-      checkTriggers();
+        try {
+          performHeartBeat();
+        }
+        catch (const std::exception &e) {
+          logger_->log_error("Exception occurred while performing heartbeat. 
error: %s", e.what());
+        }
+        catch (...) {
+          logger_->log_error("Unknonwn exception occurred while performing 
heartbeat.");
+        }

Review comment:
       incorrect/misleading indentation




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to