szaszm commented on a change in pull request #776:
URL: https://github.com/apache/nifi-minifi-cpp/pull/776#discussion_r428573881
##########
File path: libminifi/src/c2/C2Agent.cpp
##########
@@ -75,54 +78,55 @@ C2Agent::C2Agent(const
std::shared_ptr<core::controller::ControllerServiceProvid
c2_producer_ = [&]() {
// place priority on messages to send to the c2 server
- if (protocol_.load() != nullptr &&
request_mutex.try_lock_for(std::chrono::seconds(1))) {
- std::lock_guard<std::timed_mutex> lock(request_mutex, std::adopt_lock);
- if (!requests.empty()) {
- int count = 0;
- do {
- const C2Payload payload(std::move(requests.back()));
- requests.pop_back();
- try {
- C2Payload && response =
protocol_.load()->consumePayload(payload);
- enqueue_c2_server_response(std::move(response));
- }
- catch(const std::exception &e) {
- logger_->log_error("Exception occurred while consuming payload.
error: %s", e.what());
- }
- catch(...) {
- logger_->log_error("Unknonwn exception occurred while consuming
payload.");
- }
- }while(!requests.empty() && ++count < max_c2_responses);
+ if (protocol_.load() != nullptr) {
+ std::vector<C2Payload> payload_batch;
+ payload_batch.reserve(max_c2_responses);
+ auto getRequestPayload = [&payload_batch] (C2Payload&& payload) {
payload_batch.emplace_back(std::move(payload)); };
+ const std::chrono::system_clock::time_point timeout_point =
std::chrono::system_clock::now() + std::chrono::milliseconds(1);
+ for (std::size_t attempt_num = 0; attempt_num < max_c2_responses;
++attempt_num) {
+ if (!requests.consumeWaitUntil(getRequestPayload, timeout_point)) {
+ break;
}
}
- try {
- performHeartBeat();
- }
- catch(const std::exception &e) {
- logger_->log_error("Exception occurred while performing heartbeat.
error: %s", e.what());
- }
- catch(...) {
- logger_->log_error("Unknonwn exception occurred while performing
heartbeat.");
- }
+ std::for_each(
+ std::make_move_iterator(payload_batch.begin()),
+ std::make_move_iterator(payload_batch.end()),
+ [&] (C2Payload&& payload) {
+ try {
+ C2Payload && response =
protocol_.load()->consumePayload(std::move(payload));
+ enqueue_c2_server_response(std::move(response));
+ }
+ catch(const std::exception &e) {
+ logger_->log_error("Exception occurred while consuming payload.
error: %s", e.what());
+ }
+ catch(...) {
+ logger_->log_error("Unknonwn exception occurred while consuming
payload.");
+ }
+ });
- checkTriggers();
+ try {
+ performHeartBeat();
+ }
+ catch (const std::exception &e) {
+ logger_->log_error("Exception occurred while performing heartbeat.
error: %s", e.what());
+ }
+ catch (...) {
+ logger_->log_error("Unknonwn exception occurred while performing
heartbeat.");
+ }
+ }
+
+ checkTriggers();
+
+ return
utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(heart_beat_period_));
+ };
- return
utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(heart_beat_period_));
- };
functions_.push_back(c2_producer_);
- c2_consumer_ = [&]() {
- if ( queue_mutex.try_lock_for(std::chrono::seconds(1)) ) {
- C2Payload payload(Operation::HEARTBEAT);
- {
- std::lock_guard<std::timed_mutex> lock(queue_mutex, std::adopt_lock);
- if (responses.empty()) {
- return
utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(C2RESPONSE_POLL_MS));
- }
- payload = std::move(responses.back());
- responses.pop_back();
+ c2_consumer_ = [&] {
+ if (responses.size()) {
+ if (!responses.consumeWaitFor([this](C2Payload&& e) {
extractPayload(std::move(e)); }, std::chrono::seconds(1))) {
Review comment:
I don't think you can edit my suggestion (in my previous comment), but
you can manually apply the desired changes in your working directory, or I can
edit my comment for you if you specify what changes do you want.
Do you want to keep the current suggestion except the comment, or change
something else, too?
##########
File path: libminifi/src/c2/C2Agent.cpp
##########
@@ -75,54 +78,55 @@ C2Agent::C2Agent(const
std::shared_ptr<core::controller::ControllerServiceProvid
c2_producer_ = [&]() {
// place priority on messages to send to the c2 server
- if (protocol_.load() != nullptr &&
request_mutex.try_lock_for(std::chrono::seconds(1))) {
- std::lock_guard<std::timed_mutex> lock(request_mutex, std::adopt_lock);
- if (!requests.empty()) {
- int count = 0;
- do {
- const C2Payload payload(std::move(requests.back()));
- requests.pop_back();
- try {
- C2Payload && response =
protocol_.load()->consumePayload(payload);
- enqueue_c2_server_response(std::move(response));
- }
- catch(const std::exception &e) {
- logger_->log_error("Exception occurred while consuming payload.
error: %s", e.what());
- }
- catch(...) {
- logger_->log_error("Unknonwn exception occurred while consuming
payload.");
- }
- }while(!requests.empty() && ++count < max_c2_responses);
+ if (protocol_.load() != nullptr) {
+ std::vector<C2Payload> payload_batch;
+ payload_batch.reserve(max_c2_responses);
+ auto getRequestPayload = [&payload_batch] (C2Payload&& payload) {
payload_batch.emplace_back(std::move(payload)); };
+ const std::chrono::system_clock::time_point timeout_point =
std::chrono::system_clock::now() + std::chrono::milliseconds(1);
+ for (std::size_t attempt_num = 0; attempt_num < max_c2_responses;
++attempt_num) {
+ if (!requests.consumeWaitUntil(getRequestPayload, timeout_point)) {
+ break;
}
}
- try {
- performHeartBeat();
- }
- catch(const std::exception &e) {
- logger_->log_error("Exception occurred while performing heartbeat.
error: %s", e.what());
- }
- catch(...) {
- logger_->log_error("Unknonwn exception occurred while performing
heartbeat.");
- }
+ std::for_each(
+ std::make_move_iterator(payload_batch.begin()),
+ std::make_move_iterator(payload_batch.end()),
+ [&] (C2Payload&& payload) {
+ try {
+ C2Payload && response =
protocol_.load()->consumePayload(std::move(payload));
+ enqueue_c2_server_response(std::move(response));
+ }
+ catch(const std::exception &e) {
+ logger_->log_error("Exception occurred while consuming payload.
error: %s", e.what());
+ }
+ catch(...) {
+ logger_->log_error("Unknonwn exception occurred while consuming
payload.");
+ }
+ });
- checkTriggers();
+ try {
+ performHeartBeat();
+ }
+ catch (const std::exception &e) {
+ logger_->log_error("Exception occurred while performing heartbeat.
error: %s", e.what());
+ }
+ catch (...) {
+ logger_->log_error("Unknonwn exception occurred while performing
heartbeat.");
+ }
+ }
+
+ checkTriggers();
+
+ return
utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(heart_beat_period_));
+ };
- return
utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(heart_beat_period_));
- };
functions_.push_back(c2_producer_);
- c2_consumer_ = [&]() {
- if ( queue_mutex.try_lock_for(std::chrono::seconds(1)) ) {
- C2Payload payload(Operation::HEARTBEAT);
- {
- std::lock_guard<std::timed_mutex> lock(queue_mutex, std::adopt_lock);
- if (responses.empty()) {
- return
utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(C2RESPONSE_POLL_MS));
- }
- payload = std::move(responses.back());
- responses.pop_back();
+ c2_consumer_ = [&] {
+ if (responses.size()) {
+ if (!responses.consumeWaitFor([this](C2Payload&& e) {
extractPayload(std::move(e)); }, std::chrono::seconds(1))) {
Review comment:
1. My preference is `!empty()`, because it reads as "not empty", which
is the intention here. On the other hand I'm ok with using `size() > 0`, as
that's only one logical step away from the intention, but not plain `size()`
because that implicit int -> bool conversion is surprising to the reader, like
every implicit conversion that converts to something that's not logically "the
same thing". I think the readers mind goes like this:
1. if the size of responses. wtf, that doesn't make sense
2. Ah, implicit int -> bool conversion, so "if the size of the responses
is not zero".
3. That's "if the responses are not empty", i.e. "if there are responses"
edit: After a bit of research, I am now even more in favor of empty vs size.
- Effective STL Item 4: Call `empty` instead of checking `size()` against
zero. (Scott Meyers, 2001)
The rationale is that `std::list` used to have linear-time `size()`.
Doesn't apply here, but it's easier to follow a simple guideline than evaluate
the situation everytime.
- C++ Core Guidelines
https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#t143-dont-write-unintentionally-non-generic-code
2. point at Enforcement
"Emptiness works for more containers than size(), because some containers
don’t know their size or are conceptually of unbounded size."
Again, doesn't apply here, but it's easier to follow the guideline.
- clang-tidy warns on size() instead of empty():
https://clang.llvm.org/extra/clang-tidy/checks/readability-container-size-empty.html
Same rationale as before, + "also shows clearer intent to use empty()"
tl;dr: I can still accept `size() > 0`, but my preference is explained above.
2. Sorry for the accusation, you're right about the second point. I must
have been salty because of something when writing this. :(
However, this raises another problem, which is that I, as the reader,
misunderstood the code, so it's probably too complex. I suggest extracting the
lambda and maybe even the consume call, so that the identifiers can guide the
reader.
3. \-
```suggestion
if (!responses.empty()) {
const auto on_dequeue = [this](C2Payload&& payload) {
extractPayload(std::move(payload)); };
const auto consume_success = responses.consumeWaitFor(on_dequeue,
std::chrono::seconds{1});
if (!consume_success) {
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]