fgerlits commented on a change in pull request #891:
URL: https://github.com/apache/nifi-minifi-cpp/pull/891#discussion_r489443894



##########
File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
##########
@@ -298,120 +298,111 @@ void ConsumeWindowsEventLog::onSchedule(const 
std::shared_ptr<core::ProcessConte
   logger_->log_trace("Successfully configured CWEL");
 }
 
+bool ConsumeWindowsEventLog::commitAndSaveBookmark(const std::wstring 
&bookmark_xml, const std::shared_ptr<core::ProcessSession> &session) {
+  {
+    const TimeDiff time_diff;
+    session->commit();
+    logger_->log_debug("processQueue commit took %" PRId64 " ms", time_diff());
+  }
+
+  if (!bookmark_->saveBookmarkXml(bookmark_xml)) {
+    logger_->log_error("Failed to save bookmark xml");
+  }
+
+  if (session->outgoingConnectionsFull("success")) {
+    logger_->log_debug("Outgoing success connection is full");
+    return false;
+  }
+
+  return true;
+}
+
+std::tuple<size_t, std::wstring> 
ConsumeWindowsEventLog::processEventLogs(const 
std::shared_ptr<core::ProcessContext> &context,
+    const std::shared_ptr<core::ProcessSession> &session, const EVT_HANDLE& 
event_query_results) {
+  size_t processed_event_count = 0;
+  std::wstring bookmark_xml;
+  logger_->log_trace("Enumerating the events in the result set after the 
bookmarked event.");
+  while (processed_event_count < batch_commit_size_ || batch_commit_size_ == 
0) {
+    EVT_HANDLE next_event{};
+    DWORD handles_set_count{};
+    if (!EvtNext(event_query_results, 1, &next_event, EVT_NEXT_TIMEOUT_MS, 0, 
&handles_set_count)) {
+      if (ERROR_NO_MORE_ITEMS != GetLastError()) {
+        LogWindowsError("Failed to get next event");
+        continue;
+        /* According to MS this iteration should only end when the return 
value is false AND
+          the error code is NO_MORE_ITEMS. See the following page for further 
details:
+          
https://docs.microsoft.com/en-us/windows/win32/api/winevt/nf-winevt-evtnext */
+      }
+      break;
+    }
+
+    const auto guard_next_event = gsl::finally([next_event]() { 
EvtClose(next_event); });
+    logger_->log_trace("Succesfully got the next event, performing event 
rendering");
+    EventRender event_render;
+    std::wstring new_bookmark_xml;
+    if (createEventRender(next_event, event_render) && 
bookmark_->getNewBookmarkXml(next_event, new_bookmark_xml)) {
+      bookmark_xml = std::move(new_bookmark_xml);
+      processed_event_count++;
+      putEventRenderFlowFileToSession(event_render, *session);
+    }
+  }
+  logger_->log_trace("Finished enumerating events.");
+  return std::make_tuple(processed_event_count, bookmark_xml);
+}
 
 void ConsumeWindowsEventLog::onTrigger(const 
std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSession> &session) {
-  if (!pBookmark_) {
-    logger_->log_debug("pBookmark_ is null");
+  if (!bookmark_) {
+    logger_->log_debug("bookmark_ is null");
     context->yield();
     return;
   }
 
-  std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock);
+  std::unique_lock<std::mutex> lock(on_trigger_mutex_, std::try_to_lock);
   if (!lock.owns_lock()) {
     logger_->log_warn("processor was triggered before previous listing 
finished, configuration should be revised!");
     return;
   }
 
   logger_->log_trace("CWEL onTrigger");
 
-  struct TimeDiff {
-    auto operator()() const {
-      return int64_t{ 
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
 - time_).count() };
-    }
-    const decltype(std::chrono::steady_clock::now()) time_ = 
std::chrono::steady_clock::now();
-  };
-
-  const auto commitAndSaveBookmark = [&] (const std::wstring& bookmarkXml) {
-    const TimeDiff timeDiff;
-    session->commit();
-    logger_->log_debug("processQueue commit took %" PRId64 " ms", timeDiff());
-
-    const bool successful_save = pBookmark_->saveBookmarkXml(bookmarkXml);
-    if (!successful_save) {
-      logger_->log_error("Failed to save bookmark xml");
-    }
-
-    if (session->outgoingConnectionsFull("success")) {
-      logger_->log_debug("outgoingConnectionsFull");
-      return false;
-    }
-
-    return true;
-  };
-
-  size_t eventCount = 0;
-  const TimeDiff timeDiff;
+  size_t processed_event_count = 0;
+  const TimeDiff time_diff;
   const auto timeGuard = gsl::finally([&]() {
-    logger_->log_debug("processed %zu Events in %"  PRId64 " ms", eventCount, 
timeDiff());
+    logger_->log_debug("processed %zu Events in %"  PRId64 " ms", 
processed_event_count, time_diff());
   });
 
-  size_t commitAndSaveBookmarkCount = 0;
-  std::wstring bookmarkXml;
-
-  const auto hEventResults = EvtQuery(0, wstrChannel_.c_str(), 
wstrQuery_.c_str(), EvtQueryChannelPath);
-  if (!hEventResults) {
+  const auto event_query_results = EvtQuery(0, wstrChannel_.c_str(), 
wstrQuery_.c_str(), EvtQueryChannelPath);
+  if (!event_query_results) {
     LOG_LAST_ERROR(EvtQuery);
     context->yield();
     return;
   }
-  const auto guard_hEventResults = gsl::finally([hEventResults]() { 
EvtClose(hEventResults); });
+  const auto guard_event_query_results = gsl::finally([event_query_results]() 
{ EvtClose(event_query_results); });
 
   logger_->log_trace("Retrieved results in Channel: %ls with Query: %ls", 
wstrChannel_.c_str(), wstrQuery_.c_str());
 
-  auto hBookmark = pBookmark_->getBookmarkHandleFromXML();
-  if (!hBookmark) {
-    logger_->log_error("hBookmark is null, unrecoverable error!"); 
-    pBookmark_.reset();
+  auto bookmark_handle = bookmark_->getBookmarkHandleFromXML();
+  if (!bookmark_handle) {
+    logger_->log_error("bookmark_handle is null, unrecoverable error!");
+    bookmark_.reset();
     context->yield();
     return;
   }
 
-  if (!EvtSeek(hEventResults, 1, hBookmark, 0, EvtSeekRelativeToBookmark)) {
+  if (!EvtSeek(event_query_results, 1, bookmark_handle, 0, 
EvtSeekRelativeToBookmark)) {
     LOG_LAST_ERROR(EvtSeek);
     context->yield();
     return;
   }
 
   refreshTimeZoneData();
 
-  logger_->log_trace("Enumerating the events in the result set after the 
bookmarked event.");
-  while (true) {
-    EVT_HANDLE hEvent{};
-    DWORD dwReturned{};
-    if (!EvtNext(hEventResults, 1, &hEvent, EVT_NEXT_TIMEOUT_MS, 0, 
&dwReturned)) {
-      if (ERROR_NO_MORE_ITEMS != GetLastError()) {
-        LogWindowsError("Failed to get next event");
-        continue; 
-        /* According to MS this iteration should only end when the return 
value is false AND 
-         the error code is NO_MORE_ITEMS. See the following page for further 
details:
-         
https://docs.microsoft.com/en-us/windows/win32/api/winevt/nf-winevt-evtnext */
-      }
-      break;
-    }
-    const auto guard_hEvent = gsl::finally([hEvent]() { EvtClose(hEvent); });
-    logger_->log_trace("Succesfully get the next hEvent, performing event 
rendering");
-    EventRender eventRender;
-    std::wstring newBookmarkXml;
-    if (createEventRender(hEvent, eventRender) && 
pBookmark_->getNewBookmarkXml(hEvent, newBookmarkXml)) {
-      bookmarkXml = std::move(newBookmarkXml);
-      eventCount++;
-      putEventRenderFlowFileToSession(eventRender, *session);
-
-      if (batch_commit_size_ != 0U && (eventCount % batch_commit_size_ == 0)) {
-        if (!commitAndSaveBookmark(bookmarkXml)) {
-          context->yield();
-          return;
-        }
-
-        commitAndSaveBookmarkCount = eventCount;
-      }
-    }
-  }
-
-  logger_->log_trace("Finish enumerating events.");
+  std::wstring bookmark_xml;
+  std::tie(processed_event_count, bookmark_xml) = processEventLogs(context, 
session, event_query_results);
 
-  if (eventCount > commitAndSaveBookmarkCount) {
-    commitAndSaveBookmark(bookmarkXml);
+  if (processed_event_count == 0 || !commitAndSaveBookmark(bookmark_xml, 
session)) {
+    context->yield();

Review comment:
       I think we generally want to yield if there is nothing to do, so we wait 
a bit longer between `onTrigger()` calls and we don't waste resources.  
@adebreceni Do you know why GetFile does not yield when it has nothing to do?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to