am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r383786218
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -391,322 +467,122 @@ void 
ConsumeWindowsEventLog::substituteXMLPercentageItems(pugi::xml_document& do
   doc.traverse(treeWalker);
 }
 
-void ConsumeWindowsEventLog::processEvent(EVT_HANDLE hEvent) {
+bool ConsumeWindowsEventLog::processEvent(EVT_HANDLE hEvent, EventRender& 
renderedData) {
   DWORD size = 0;
   DWORD used = 0;
   DWORD propertyCount = 0;
-  if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, 
&propertyCount)) {
-    if (ERROR_INSUFFICIENT_BUFFER == GetLastError()) {
-      if (used > maxBufferSize_) {
-        logger_->log_error("Dropping event %x because it couldn't be rendered 
within %ll bytes.", hEvent, maxBufferSize_);
-        return;
-      }
-
-      size = used;
-      std::vector<wchar_t> buf(size / 2 + 1);
-      if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, &buf[0], &used, 
&propertyCount)) {
-        logger_->log_error("!EvtRender error: %d.", GetLastError());
-        return;
-      }
-
-      std::string xml = wel::to_string(&buf[0]);
-
-      pugi::xml_document doc;
-      pugi::xml_parse_result result = doc.load_string(xml.c_str());
-
-      if (!result) {
-        logger_->log_error("Invalid XML produced");
-        return;
-      }
-      // this is a well known path. 
-      std::string providerName = 
doc.child("Event").child("System").child("Provider").attribute("Name").value();
-      wel::MetadataWalker 
walker(getEventLogHandler(providerName).getMetadata(), channel_, hEvent, 
!resolve_as_attributes_, apply_identifier_function_, regex_);
-
-      // resolve the event metadata
-      doc.traverse(walker);
-
-      EventRender renderedData;
-
-      if (writePlainText_) {
-        auto handler = getEventLogHandler(providerName);
-        auto message = handler.getEventMessage(hEvent);
-
-        if (!message.empty()) {
-
-          for (const auto &mapEntry : walker.getIdentifiers()) {
-            // replace the identifiers with their translated strings.
-            utils::StringUtils::replaceAll(message, mapEntry.first, 
mapEntry.second);
-          }
-          wel::WindowsEventLogHeader log_header(header_names_);
-          // set the delimiter
-          log_header.setDelimiter(header_delimiter_);
-          // render the header.
-          renderedData.rendered_text_ = log_header.getEventHeader(&walker);
-          renderedData.rendered_text_ += "Message" + header_delimiter_ + " ";
-          renderedData.rendered_text_ += message;
-        }
-      }
-
-      if (writeXML_) {
-        substituteXMLPercentageItems(doc);
-
-        if (resolve_as_attributes_) {
-          renderedData.matched_fields_ = walker.getFieldValues();
-        }
-
-        wel::XmlString writer;
-        doc.print(writer, "", pugi::format_raw); // no indentation or 
formatting
-        xml = writer.xml_;
-
-        renderedData.text_ = std::move(xml);
-      }
-
-      if (pBookmark_) {
-        std::wstring bookmarkXml;
-        if (pBookmark_->getNewBookmarkXml(hEvent, bookmarkXml)) {
-          renderedData.bookmarkXml_ = bookmarkXml;
-        }
-      }
-
-      listRenderedData_.enqueue(std::move(renderedData));
-    }
+  EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, &propertyCount);
+  if (ERROR_INSUFFICIENT_BUFFER != GetLastError()) {
+    logger_->log_error("!EvtRender error %d.", GetLastError());
+    return false;
   }
-}
 
-bool ConsumeWindowsEventLog::processEventsAfterBookmark(EVT_HANDLE 
hEventResults, const std::wstring& channel, const std::wstring& query) {
-  if (!EvtSeek(hEventResults, 1, pBookmark_->bookmarkHandle(), 0, 
EvtSeekRelativeToBookmark)) {
-    logger_->log_error("!EvtSeek error %d.", GetLastError());
+  if (used > maxBufferSize_) {
+    logger_->log_error("Dropping event %x because it couldn't be rendered 
within %ll bytes.", hEvent, maxBufferSize_);
     return false;
   }
 
-  // Enumerate the events in the result set after the bookmarked event.
-  while (true) {
-    EVT_HANDLE hEvent{};
-    DWORD dwReturned{};
-    if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
-      DWORD status = ERROR_SUCCESS;
-      if (ERROR_NO_MORE_ITEMS != (status = GetLastError())) {
-        logger_->log_error("!EvtNext error %d.", status);
-      }
-      break;
-    }
-
-    processEvent(hEvent);
-
-    EvtClose(hEvent);
+  size = used;
+  std::vector<wchar_t> buf(size / 2 + 1);
+  if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, &buf[0], &used, 
&propertyCount)) {
+    logger_->log_error("!EvtRender error: %d.", GetLastError());
+    return false;
   }
 
-  return true;
-}
-
+  std::string xml = wel::to_string(&buf[0]);
 
-bool ConsumeWindowsEventLog::subscribe(const 
std::shared_ptr<core::ProcessContext> &context) {
-  context->getProperty(Channel.getName(), channel_);
-  context->getProperty(Query.getName(), query_);
+  pugi::xml_document doc;
+  pugi::xml_parse_result result = doc.load_string(xml.c_str());
 
-  context->getProperty(MaxBufferSize.getName(), maxBufferSize_);
-  logger_->log_debug("ConsumeWindowsEventLog: maxBufferSize_ %lld", 
maxBufferSize_);
-
-  provenanceUri_ = "winlog://" + computerName_ + "/" + channel_ + "?" + query_;
-
-  std::string strInactiveDurationToReconnect;
-  context->getProperty(InactiveDurationToReconnect.getName(), 
strInactiveDurationToReconnect);
-
-  // Get 'inactiveDurationToReconnect_'.
-  core::TimeUnit unit;
-  if (core::Property::StringToTime(strInactiveDurationToReconnect, 
inactiveDurationToReconnect_, unit) &&
-    core::Property::ConvertTimeUnitToMS(inactiveDurationToReconnect_, unit, 
inactiveDurationToReconnect_)) {
-    logger_->log_info("inactiveDurationToReconnect: [%lld] ms", 
inactiveDurationToReconnect_);
-  }
-
-  if (!pBookmark_) {
-    logger_->log_error("!pBookmark_");
+  if (!result) {
+    logger_->log_error("Invalid XML produced");
     return false;
   }
 
-  auto channel = std::wstring(channel_.begin(), channel_.end());
-  auto query = std::wstring(query_.begin(), query_.end());
+  // this is a well known path. 
+  std::string providerName = 
doc.child("Event").child("System").child("Provider").attribute("Name").value();
+  wel::MetadataWalker walker(getEventLogHandler(providerName).getMetadata(), 
channel_, hEvent, !resolve_as_attributes_, apply_identifier_function_, regex_);
 
-  do {
-    auto hEventResults = EvtQuery(0, channel.c_str(), query.c_str(), 
EvtQueryChannelPath);
-    if (!hEventResults) {
-      logger_->log_error("!EvtQuery error: %d.", GetLastError());
-      // Consider it as a serious error.
-      return false;
-    }
-    const utils::ScopeGuard guard_hEventResults([hEventResults]() { 
EvtClose(hEventResults); });
+  // resolve the event metadata
+  doc.traverse(walker);
 
-    if (pBookmark_->hasBookmarkXml()) {
-      if (!processEventsAfterBookmark(hEventResults, channel, query)) {
-        break;
-      }
-    } else {
-      // Seek to the last event in the hEventResults.
-      if (!EvtSeek(hEventResults, 0, 0, 0, EvtSeekRelativeToLast)) {
-        logger_->log_error("!EvtSeek error: %d.", GetLastError());
-        break;
-      }
+  if (writePlainText_) {
+    auto handler = getEventLogHandler(providerName);
+    auto message = handler.getEventMessage(hEvent);
 
-      DWORD dwReturned{};
-      EVT_HANDLE hEvent{};
-      if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
-        logger_->log_error("!EvtNext error: %d.", GetLastError());
-        break;
+    if (!message.empty()) {
+
+      for (const auto &mapEntry : walker.getIdentifiers()) {
+        // replace the identifiers with their translated strings.
+        utils::StringUtils::replaceAll(message, mapEntry.first, 
mapEntry.second);
       }
+      wel::WindowsEventLogHeader log_header(header_names_);
+      // set the delimiter
+      log_header.setDelimiter(header_delimiter_);
+      // render the header.
+      renderedData.rendered_text_ = log_header.getEventHeader(&walker);
+      renderedData.rendered_text_ += "Message" + header_delimiter_ + " ";
+      renderedData.rendered_text_ += message;
+    }
+  }
 
-      pBookmark_->saveBookmark(hEvent);
+  if (writeXML_) {
+    substituteXMLPercentageItems(doc);
+
+    if (resolve_as_attributes_) {
+      renderedData.matched_fields_ = walker.getFieldValues();
     }
-  } while (false);
-
-  subscriptionHandle_ = EvtSubscribe(
-      NULL,
-      NULL,
-      channel.c_str(),
-      query.c_str(),
-      NULL,
-      this,
-      [](EVT_SUBSCRIBE_NOTIFY_ACTION action, PVOID pContext, EVT_HANDLE hEvent)
-      {
-        auto pConsumeWindowsEventLog = 
static_cast<ConsumeWindowsEventLog*>(pContext);
-
-        auto& logger = pConsumeWindowsEventLog->logger_;
-
-        if (action == EvtSubscribeActionError) {
-          if (ERROR_EVT_QUERY_RESULT_STALE == (DWORD)hEvent) {
-            logger->log_error("Received missing event notification. Consider 
triggering processor more frequently or increasing queue size.");
-          } else {
-            logger->log_error("Received the following Win32 error: %x", 
hEvent);
-          }
-        } else if (action == EvtSubscribeActionDeliver) {
-          pConsumeWindowsEventLog->processEvent(hEvent);
-        }
 
-        return 0UL;
-      },
-      EvtSubscribeToFutureEvents | EvtSubscribeStrict);
+    wel::XmlString writer;
+    doc.print(writer, "", pugi::format_raw); // no indentation or formatting
+    xml = writer.xml_;
 
-  if (!subscriptionHandle_) {
-    logger_->log_error("Unable to subscribe with provided parameters, received 
the following error code: %d", GetLastError());
-    return false;
+    renderedData.text_ = std::move(xml);
   }
 
-  lastActivityTimestamp_ = GetTickCount64();
-
   return true;
 }
 
-void ConsumeWindowsEventLog::unsubscribe()
-{
-  if (subscriptionHandle_) {
-    EvtClose(subscriptionHandle_);
-    subscriptionHandle_ = 0;
-  }
-}
-
-int ConsumeWindowsEventLog::processQueue(const 
std::shared_ptr<core::ProcessSession> &session)
+void ConsumeWindowsEventLog::processEventRender(const EventRender& 
renderedData, core::ProcessSession& session)
 {
-  struct WriteCallback: public OutputStreamCallback {
+  struct WriteCallback : public OutputStreamCallback {
     WriteCallback(const std::string& str)
-      : data_(str.c_str()), size_(str.length()) {
+      : str_(str.c_str()) {
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to