hunyadi-dev commented on a change in pull request #776:
URL: https://github.com/apache/nifi-minifi-cpp/pull/776#discussion_r428117653
##########
File path: libminifi/src/c2/C2Agent.cpp
##########
@@ -75,54 +78,55 @@ C2Agent::C2Agent(const
std::shared_ptr<core::controller::ControllerServiceProvid
c2_producer_ = [&]() {
// place priority on messages to send to the c2 server
- if (protocol_.load() != nullptr &&
request_mutex.try_lock_for(std::chrono::seconds(1))) {
- std::lock_guard<std::timed_mutex> lock(request_mutex, std::adopt_lock);
- if (!requests.empty()) {
- int count = 0;
- do {
- const C2Payload payload(std::move(requests.back()));
- requests.pop_back();
- try {
- C2Payload && response =
protocol_.load()->consumePayload(payload);
- enqueue_c2_server_response(std::move(response));
- }
- catch(const std::exception &e) {
- logger_->log_error("Exception occurred while consuming payload.
error: %s", e.what());
- }
- catch(...) {
- logger_->log_error("Unknonwn exception occurred while consuming
payload.");
- }
- }while(!requests.empty() && ++count < max_c2_responses);
+ if (protocol_.load() != nullptr) {
+ std::vector<C2Payload> payload_batch;
+ payload_batch.reserve(max_c2_responses);
+ auto getRequestPayload = [&payload_batch] (C2Payload&& payload) {
payload_batch.emplace_back(std::move(payload)); };
+ const std::chrono::system_clock::time_point timeout_point =
std::chrono::system_clock::now() + std::chrono::milliseconds(1);
+ for (std::size_t attempt_num = 0; attempt_num < max_c2_responses;
++attempt_num) {
+ if (!requests.consumeWaitUntil(getRequestPayload, timeout_point)) {
+ break;
}
}
- try {
- performHeartBeat();
- }
- catch(const std::exception &e) {
- logger_->log_error("Exception occurred while performing heartbeat.
error: %s", e.what());
- }
- catch(...) {
- logger_->log_error("Unknonwn exception occurred while performing
heartbeat.");
- }
+ std::for_each(
+ std::make_move_iterator(payload_batch.begin()),
+ std::make_move_iterator(payload_batch.end()),
+ [&] (C2Payload&& payload) {
+ try {
+ C2Payload && response =
protocol_.load()->consumePayload(std::move(payload));
+ enqueue_c2_server_response(std::move(response));
+ }
+ catch(const std::exception &e) {
+ logger_->log_error("Exception occurred while consuming payload.
error: %s", e.what());
+ }
+ catch(...) {
+ logger_->log_error("Unknonwn exception occurred while consuming
payload.");
+ }
+ });
- checkTriggers();
+ try {
+ performHeartBeat();
+ }
+ catch (const std::exception &e) {
+ logger_->log_error("Exception occurred while performing heartbeat.
error: %s", e.what());
+ }
+ catch (...) {
+ logger_->log_error("Unknonwn exception occurred while performing
heartbeat.");
+ }
+ }
+
+ checkTriggers();
+
+ return
utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(heart_beat_period_));
+ };
- return
utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(heart_beat_period_));
- };
functions_.push_back(c2_producer_);
- c2_consumer_ = [&]() {
- if ( queue_mutex.try_lock_for(std::chrono::seconds(1)) ) {
- C2Payload payload(Operation::HEARTBEAT);
- {
- std::lock_guard<std::timed_mutex> lock(queue_mutex, std::adopt_lock);
- if (responses.empty()) {
- return
utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(C2RESPONSE_POLL_MS));
- }
- payload = std::move(responses.back());
- responses.pop_back();
+ c2_consumer_ = [&] {
+ if (responses.size()) {
+ if (!responses.consumeWaitFor([this](C2Payload&& e) {
extractPayload(std::move(e)); }, std::chrono::seconds(1))) {
Review comment:
I am probably wrong, but as far as I can see, this is the same behaviour:
- If no elements were present in the queue, we did not call
`extractPayload()` at all.
- If there were elements, but by the time we wanted to grab the lock the
queue became busy, we waited 1 second and tried fetching an element
- If by this time we succeeded, we called `extractPayload()` with this
dequeued data
- Otherwise we used the default constructed payload: `C2Payload{
Operation::HEARTBEAT }` instead.
Regarding the other point:
To me `!empty()` feels like something that is easily mistaken for `empty()`,
`size()` is guaranteed to be constant time for `std::deque` and `empty()` is
probably implemented as `size() == 0` anyway, right?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]