szaszm commented on a change in pull request #992:
URL: https://github.com/apache/nifi-minifi-cpp/pull/992#discussion_r590393358



##########
File path: libminifi/src/Connection.cpp
##########
@@ -179,56 +171,44 @@ void 
Connection::multiPut(std::vector<std::shared_ptr<core::FlowFile>>& flows) {
 }
 
 std::shared_ptr<core::FlowFile> 
Connection::poll(std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords) 
{
-  std::lock_guard<std::mutex> lock(mutex_);
-
-  while (!queue_.empty()) {
-    std::shared_ptr<core::FlowFile> item = queue_.front();
-    queue_.pop();
-    queued_data_size_ -= item->getSize();
+  auto lockedQueue = queue_.lock();
+
+  while (!lockedQueue.empty()) {
+    std::shared_ptr<core::FlowFile> item = lockedQueue.front();
+
+    // We need to check for flow expiration
+    if (expired_duration_ > 0 && utils::timeutils::getTimeMillis() > 
(item->getEntryDate() + expired_duration_)) {
+      // Flow record expired
+      lockedQueue.pop();
+      queued_data_size_ -= item->getSize();
+      expiredFlowRecords.insert(item);
+      logger_->log_debug("Delete flow file UUID %s from connection %s, because 
it expired", item->getUUIDStr(), name_);
+      continue;
+    }
 
-    if (expired_duration_ > 0) {
-      // We need to check for flow expiration
-      if (utils::timeutils::getTimeMillis() > (item->getEntryDate() + 
expired_duration_)) {
-        // Flow record expired
-        expiredFlowRecords.insert(item);
-        logger_->log_debug("Delete flow file UUID %s from connection %s, 
because it expired", item->getUUIDStr(), name_);
-      } else {
-        // Flow record not expired
-        if (item->isPenalized()) {
-          // Flow record was penalized
-          queue_.push(item);
-          queued_data_size_ += item->getSize();
-          break;
-        }
-        std::shared_ptr<Connectable> connectable = 
std::static_pointer_cast<Connectable>(shared_from_this());
-        item->setConnection(connectable);
-        logger_->log_debug("Dequeue flow file UUID %s from connection %s", 
item->getUUIDStr(), name_);
-        return item;
-      }
-    } else {
-      // Flow record not expired
-      if (item->isPenalized()) {
-        // Flow record was penalized
-        queue_.push(item);
-        queued_data_size_ += item->getSize();
-        break;
-      }
-      std::shared_ptr<Connectable> connectable = 
std::static_pointer_cast<Connectable>(shared_from_this());
-      item->setConnection(connectable);
-      logger_->log_debug("Dequeue flow file UUID %s from connection %s", 
item->getUUIDStr(), name_);
-      return item;
+    // Flow record not expired
+    if (item->isPenalized()) {
+      // Flow record was penalized
+      break;

Review comment:
       This looks like a change in behavior. Previously penalized flow files 
were moved to the front of the queue, now they remain at the back. I think the 
original behavior was intentional, but I can't say for 100% sure. Unless we can 
be sure that this change doesn't affect behavior, I would opt for reverting to 
the old behavior of reinserting flow files.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to