szaszm commented on a change in pull request #776:
URL: https://github.com/apache/nifi-minifi-cpp/pull/776#discussion_r427436157
##########
File path: libminifi/include/c2/C2Agent.h
##########
@@ -41,8 +41,15 @@ namespace org {
namespace apache {
namespace nifi {
namespace minifi {
+
+namespace utils {
+template <typename T>
+class ConditionConcurrentQueue;
+}
Review comment:
Why don't we include the concurrent queue header? `C2Agent` needs the
definition, not just the declaration. It compiles probably because we
accidentally include it anyway (ThreadPool?), but we should include it
explicitly if we use it.
https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#Rs-implicit
##########
File path: libminifi/src/c2/C2Agent.cpp
##########
@@ -75,54 +78,55 @@ C2Agent::C2Agent(const
std::shared_ptr<core::controller::ControllerServiceProvid
c2_producer_ = [&]() {
// place priority on messages to send to the c2 server
- if (protocol_.load() != nullptr &&
request_mutex.try_lock_for(std::chrono::seconds(1))) {
- std::lock_guard<std::timed_mutex> lock(request_mutex, std::adopt_lock);
- if (!requests.empty()) {
- int count = 0;
- do {
- const C2Payload payload(std::move(requests.back()));
- requests.pop_back();
- try {
- C2Payload && response =
protocol_.load()->consumePayload(payload);
- enqueue_c2_server_response(std::move(response));
- }
- catch(const std::exception &e) {
- logger_->log_error("Exception occurred while consuming payload.
error: %s", e.what());
- }
- catch(...) {
- logger_->log_error("Unknonwn exception occurred while consuming
payload.");
- }
- }while(!requests.empty() && ++count < max_c2_responses);
+ if (protocol_.load() != nullptr) {
+ std::vector<C2Payload> payload_batch;
+ payload_batch.reserve(max_c2_responses);
+ auto getRequestPayload = [&payload_batch] (C2Payload&& payload) {
payload_batch.emplace_back(std::move(payload)); };
+ const std::chrono::system_clock::time_point timeout_point =
std::chrono::system_clock::now() + std::chrono::milliseconds(1);
+ for (std::size_t attempt_num = 0; attempt_num < max_c2_responses;
++attempt_num) {
+ if (!requests.consumeWaitUntil(getRequestPayload, timeout_point)) {
Review comment:
Previously we took the immediately available payloads only. Now we wait
1ms. Why is this change?
If we reverted to the old way, it would also no longer be a problem to
reunify the first two loops and eliminate the need for the temporary buffer
(`payload_batch`).
##########
File path: libminifi/src/c2/C2Agent.cpp
##########
@@ -75,54 +78,55 @@ C2Agent::C2Agent(const
std::shared_ptr<core::controller::ControllerServiceProvid
c2_producer_ = [&]() {
// place priority on messages to send to the c2 server
- if (protocol_.load() != nullptr &&
request_mutex.try_lock_for(std::chrono::seconds(1))) {
- std::lock_guard<std::timed_mutex> lock(request_mutex, std::adopt_lock);
- if (!requests.empty()) {
- int count = 0;
- do {
- const C2Payload payload(std::move(requests.back()));
- requests.pop_back();
- try {
- C2Payload && response =
protocol_.load()->consumePayload(payload);
- enqueue_c2_server_response(std::move(response));
- }
- catch(const std::exception &e) {
- logger_->log_error("Exception occurred while consuming payload.
error: %s", e.what());
- }
- catch(...) {
- logger_->log_error("Unknonwn exception occurred while consuming
payload.");
- }
- }while(!requests.empty() && ++count < max_c2_responses);
+ if (protocol_.load() != nullptr) {
+ std::vector<C2Payload> payload_batch;
+ payload_batch.reserve(max_c2_responses);
+ auto getRequestPayload = [&payload_batch] (C2Payload&& payload) {
payload_batch.emplace_back(std::move(payload)); };
+ const std::chrono::system_clock::time_point timeout_point =
std::chrono::system_clock::now() + std::chrono::milliseconds(1);
+ for (std::size_t attempt_num = 0; attempt_num < max_c2_responses;
++attempt_num) {
+ if (!requests.consumeWaitUntil(getRequestPayload, timeout_point)) {
+ break;
}
}
- try {
- performHeartBeat();
- }
- catch(const std::exception &e) {
- logger_->log_error("Exception occurred while performing heartbeat.
error: %s", e.what());
- }
- catch(...) {
- logger_->log_error("Unknonwn exception occurred while performing
heartbeat.");
- }
+ std::for_each(
+ std::make_move_iterator(payload_batch.begin()),
+ std::make_move_iterator(payload_batch.end()),
+ [&] (C2Payload&& payload) {
+ try {
+ C2Payload && response =
protocol_.load()->consumePayload(std::move(payload));
+ enqueue_c2_server_response(std::move(response));
+ }
+ catch(const std::exception &e) {
+ logger_->log_error("Exception occurred while consuming payload.
error: %s", e.what());
+ }
+ catch(...) {
+ logger_->log_error("Unknonwn exception occurred while consuming
payload.");
+ }
+ });
- checkTriggers();
+ try {
+ performHeartBeat();
+ }
+ catch (const std::exception &e) {
+ logger_->log_error("Exception occurred while performing heartbeat.
error: %s", e.what());
+ }
+ catch (...) {
+ logger_->log_error("Unknonwn exception occurred while performing
heartbeat.");
+ }
+ }
+
+ checkTriggers();
+
+ return
utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(heart_beat_period_));
+ };
- return
utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(heart_beat_period_));
- };
functions_.push_back(c2_producer_);
- c2_consumer_ = [&]() {
- if ( queue_mutex.try_lock_for(std::chrono::seconds(1)) ) {
- C2Payload payload(Operation::HEARTBEAT);
- {
- std::lock_guard<std::timed_mutex> lock(queue_mutex, std::adopt_lock);
- if (responses.empty()) {
- return
utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(C2RESPONSE_POLL_MS));
- }
- payload = std::move(responses.back());
- responses.pop_back();
+ c2_consumer_ = [&] {
+ if (responses.size()) {
+ if (!responses.consumeWaitFor([this](C2Payload&& e) {
extractPayload(std::move(e)); }, std::chrono::seconds(1))) {
Review comment:
The outer if should say what it means (`!responses.empty()`) and we
should use `operator&&` (logical AND) instead of nested if statements.
The code doesn't make sense. Previously we called `extractPayload` with the
consumed object. Now we call it with that AND with an empty meaningless
temporary. This must be a bug, please fix!
Now we can also wait indefinitely for the lock, but I don't think there's
high contention so I don't mind that.
##########
File path: libminifi/src/c2/C2Agent.cpp
##########
@@ -75,54 +78,55 @@ C2Agent::C2Agent(const
std::shared_ptr<core::controller::ControllerServiceProvid
c2_producer_ = [&]() {
// place priority on messages to send to the c2 server
- if (protocol_.load() != nullptr &&
request_mutex.try_lock_for(std::chrono::seconds(1))) {
- std::lock_guard<std::timed_mutex> lock(request_mutex, std::adopt_lock);
- if (!requests.empty()) {
- int count = 0;
- do {
- const C2Payload payload(std::move(requests.back()));
- requests.pop_back();
- try {
- C2Payload && response =
protocol_.load()->consumePayload(payload);
- enqueue_c2_server_response(std::move(response));
- }
- catch(const std::exception &e) {
- logger_->log_error("Exception occurred while consuming payload.
error: %s", e.what());
- }
- catch(...) {
- logger_->log_error("Unknonwn exception occurred while consuming
payload.");
- }
- }while(!requests.empty() && ++count < max_c2_responses);
+ if (protocol_.load() != nullptr) {
+ std::vector<C2Payload> payload_batch;
+ payload_batch.reserve(max_c2_responses);
+ auto getRequestPayload = [&payload_batch] (C2Payload&& payload) {
payload_batch.emplace_back(std::move(payload)); };
+ const std::chrono::system_clock::time_point timeout_point =
std::chrono::system_clock::now() + std::chrono::milliseconds(1);
+ for (std::size_t attempt_num = 0; attempt_num < max_c2_responses;
++attempt_num) {
+ if (!requests.consumeWaitUntil(getRequestPayload, timeout_point)) {
+ break;
}
}
- try {
- performHeartBeat();
- }
- catch(const std::exception &e) {
- logger_->log_error("Exception occurred while performing heartbeat.
error: %s", e.what());
- }
- catch(...) {
- logger_->log_error("Unknonwn exception occurred while performing
heartbeat.");
- }
+ std::for_each(
+ std::make_move_iterator(payload_batch.begin()),
+ std::make_move_iterator(payload_batch.end()),
+ [&] (C2Payload&& payload) {
+ try {
+ C2Payload && response =
protocol_.load()->consumePayload(std::move(payload));
+ enqueue_c2_server_response(std::move(response));
+ }
+ catch(const std::exception &e) {
+ logger_->log_error("Exception occurred while consuming payload.
error: %s", e.what());
+ }
+ catch(...) {
+ logger_->log_error("Unknonwn exception occurred while consuming
payload.");
+ }
+ });
- checkTriggers();
+ try {
+ performHeartBeat();
+ }
+ catch (const std::exception &e) {
+ logger_->log_error("Exception occurred while performing heartbeat.
error: %s", e.what());
+ }
+ catch (...) {
+ logger_->log_error("Unknonwn exception occurred while performing
heartbeat.");
+ }
Review comment:
incorrect/misleading indentation
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]