dam4rus commented on a change in pull request #992:
URL: https://github.com/apache/nifi-minifi-cpp/pull/992#discussion_r590511563
##########
File path: libminifi/src/Connection.cpp
##########
@@ -179,56 +171,44 @@ void
Connection::multiPut(std::vector<std::shared_ptr<core::FlowFile>>& flows) {
}
std::shared_ptr<core::FlowFile>
Connection::poll(std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords)
{
- std::lock_guard<std::mutex> lock(mutex_);
-
- while (!queue_.empty()) {
- std::shared_ptr<core::FlowFile> item = queue_.front();
- queue_.pop();
- queued_data_size_ -= item->getSize();
+ auto lockedQueue = queue_.lock();
+
+ while (!lockedQueue.empty()) {
+ std::shared_ptr<core::FlowFile> item = lockedQueue.front();
+
+ // We need to check for flow expiration
+ if (expired_duration_ > 0 && utils::timeutils::getTimeMillis() >
(item->getEntryDate() + expired_duration_)) {
+ // Flow record expired
+ lockedQueue.pop();
+ queued_data_size_ -= item->getSize();
+ expiredFlowRecords.insert(item);
+ logger_->log_debug("Delete flow file UUID %s from connection %s, because
it expired", item->getUUIDStr(), name_);
+ continue;
+ }
- if (expired_duration_ > 0) {
- // We need to check for flow expiration
- if (utils::timeutils::getTimeMillis() > (item->getEntryDate() +
expired_duration_)) {
- // Flow record expired
- expiredFlowRecords.insert(item);
- logger_->log_debug("Delete flow file UUID %s from connection %s,
because it expired", item->getUUIDStr(), name_);
- } else {
- // Flow record not expired
- if (item->isPenalized()) {
- // Flow record was penalized
- queue_.push(item);
- queued_data_size_ += item->getSize();
- break;
- }
- std::shared_ptr<Connectable> connectable =
std::static_pointer_cast<Connectable>(shared_from_this());
- item->setConnection(connectable);
- logger_->log_debug("Dequeue flow file UUID %s from connection %s",
item->getUUIDStr(), name_);
- return item;
- }
- } else {
- // Flow record not expired
- if (item->isPenalized()) {
- // Flow record was penalized
- queue_.push(item);
- queued_data_size_ += item->getSize();
- break;
- }
- std::shared_ptr<Connectable> connectable =
std::static_pointer_cast<Connectable>(shared_from_this());
- item->setConnection(connectable);
- logger_->log_debug("Dequeue flow file UUID %s from connection %s",
item->getUUIDStr(), name_);
- return item;
+ // Flow record not expired
+ if (item->isPenalized()) {
+ // Flow record was penalized
+ break;
Review comment:
@fgerlits Your PR also changes the underlying queue in Connection, which
makes rebasing almost impossible. I wonder if with your changes does this PR
has any merit? At this point I think abandoning this ticket would be the best
option.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]