fgerlits commented on a change in pull request #891:
URL: https://github.com/apache/nifi-minifi-cpp/pull/891#discussion_r482939199
##########
File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
##########
@@ -298,120 +298,108 @@ void ConsumeWindowsEventLog::onSchedule(const
std::shared_ptr<core::ProcessConte
logger_->log_trace("Successfully configured CWEL");
}
+bool ConsumeWindowsEventLog::commitAndSaveBookmark(const std::wstring
&bookmark_xml, const std::shared_ptr<core::ProcessSession> &session) {
+ {
+ const TimeDiff time_diff;
+ session->commit();
+ logger_->log_debug("processQueue commit took %" PRId64 " ms", time_diff());
+ }
+
+ if (!bookmark_->saveBookmarkXml(bookmark_xml)) {
+ logger_->log_error("Failed to save bookmark xml");
+ }
+
+ if (session->outgoingConnectionsFull("success")) {
+ logger_->log_debug("Outgoing success connection is full");
+ return false;
+ }
+
+ return true;
+}
+
+std::wstring ConsumeWindowsEventLog::processEventLogs(const
std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::ProcessSession> &session,
+ size_t& processed_event_count, const EVT_HANDLE& event_query_results) {
+ std::wstring bookmark_xml;
+ logger_->log_trace("Enumerating the events in the result set after the
bookmarked event.");
+ while (processed_event_count < batch_commit_size_ || batch_commit_size_ ==
0) {
+ EVT_HANDLE next_event{};
+ DWORD handles_set_count{};
+ if (!EvtNext(event_query_results, 1, &next_event, EVT_NEXT_TIMEOUT_MS, 0,
&handles_set_count)) {
+ if (ERROR_NO_MORE_ITEMS != GetLastError()) {
+ LogWindowsError("Failed to get next event");
+ continue;
+ /* According to MS this iteration should only end when the return
value is false AND
+ the error code is NO_MORE_ITEMS. See the following page for further
details:
+
https://docs.microsoft.com/en-us/windows/win32/api/winevt/nf-winevt-evtnext */
+ }
+ break;
+ }
+
+ const auto guard_next_event = gsl::finally([next_event]() {
EvtClose(next_event); });
+ logger_->log_trace("Succesfully got the next event, performing event
rendering");
+ EventRender event_render;
+ std::wstring new_bookmark_xml;
+ if (createEventRender(next_event, event_render) &&
bookmark_->getNewBookmarkXml(next_event, new_bookmark_xml)) {
+ bookmark_xml = std::move(new_bookmark_xml);
+ processed_event_count++;
+ putEventRenderFlowFileToSession(event_render, *session);
+ }
+ }
+ logger_->log_trace("Finished enumerating events.");
+ return bookmark_xml;
+}
void ConsumeWindowsEventLog::onTrigger(const
std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::ProcessSession> &session) {
- if (!pBookmark_) {
- logger_->log_debug("pBookmark_ is null");
+ if (!bookmark_) {
+ logger_->log_debug("bookmark_ is null");
context->yield();
return;
}
- std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock);
+ std::unique_lock<std::mutex> lock(on_trigger_mutex_, std::try_to_lock);
if (!lock.owns_lock()) {
logger_->log_warn("processor was triggered before previous listing
finished, configuration should be revised!");
return;
}
logger_->log_trace("CWEL onTrigger");
- struct TimeDiff {
- auto operator()() const {
- return int64_t{
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
- time_).count() };
- }
- const decltype(std::chrono::steady_clock::now()) time_ =
std::chrono::steady_clock::now();
- };
-
- const auto commitAndSaveBookmark = [&] (const std::wstring& bookmarkXml) {
- const TimeDiff timeDiff;
- session->commit();
- logger_->log_debug("processQueue commit took %" PRId64 " ms", timeDiff());
-
- const bool successful_save = pBookmark_->saveBookmarkXml(bookmarkXml);
- if (!successful_save) {
- logger_->log_error("Failed to save bookmark xml");
- }
-
- if (session->outgoingConnectionsFull("success")) {
- logger_->log_debug("outgoingConnectionsFull");
- return false;
- }
-
- return true;
- };
-
- size_t eventCount = 0;
- const TimeDiff timeDiff;
+ size_t processed_event_count = 0;
+ const TimeDiff time_diff;
const auto timeGuard = gsl::finally([&]() {
- logger_->log_debug("processed %zu Events in %" PRId64 " ms", eventCount,
timeDiff());
+ logger_->log_debug("processed %zu Events in %" PRId64 " ms",
processed_event_count, time_diff());
});
- size_t commitAndSaveBookmarkCount = 0;
- std::wstring bookmarkXml;
-
- const auto hEventResults = EvtQuery(0, wstrChannel_.c_str(),
wstrQuery_.c_str(), EvtQueryChannelPath);
- if (!hEventResults) {
+ const auto event_query_results = EvtQuery(0, wstrChannel_.c_str(),
wstrQuery_.c_str(), EvtQueryChannelPath);
+ if (!event_query_results) {
LOG_LAST_ERROR(EvtQuery);
context->yield();
return;
}
- const auto guard_hEventResults = gsl::finally([hEventResults]() {
EvtClose(hEventResults); });
+ const auto guard_event_query_results = gsl::finally([event_query_results]()
{ EvtClose(event_query_results); });
logger_->log_trace("Retrieved results in Channel: %ls with Query: %ls",
wstrChannel_.c_str(), wstrQuery_.c_str());
- auto hBookmark = pBookmark_->getBookmarkHandleFromXML();
- if (!hBookmark) {
- logger_->log_error("hBookmark is null, unrecoverable error!");
- pBookmark_.reset();
+ auto bookmark_handle = bookmark_->getBookmarkHandleFromXML();
+ if (!bookmark_handle) {
+ logger_->log_error("bookmark_handle is null, unrecoverable error!");
+ bookmark_.reset();
context->yield();
return;
}
- if (!EvtSeek(hEventResults, 1, hBookmark, 0, EvtSeekRelativeToBookmark)) {
+ if (!EvtSeek(event_query_results, 1, bookmark_handle, 0,
EvtSeekRelativeToBookmark)) {
LOG_LAST_ERROR(EvtSeek);
context->yield();
return;
}
refreshTimeZoneData();
+ auto bookmark_xml = processEventLogs(context,session, processed_event_count,
event_query_results);
- logger_->log_trace("Enumerating the events in the result set after the
bookmarked event.");
- while (true) {
- EVT_HANDLE hEvent{};
- DWORD dwReturned{};
- if (!EvtNext(hEventResults, 1, &hEvent, EVT_NEXT_TIMEOUT_MS, 0,
&dwReturned)) {
- if (ERROR_NO_MORE_ITEMS != GetLastError()) {
- LogWindowsError("Failed to get next event");
- continue;
- /* According to MS this iteration should only end when the return
value is false AND
- the error code is NO_MORE_ITEMS. See the following page for further
details:
-
https://docs.microsoft.com/en-us/windows/win32/api/winevt/nf-winevt-evtnext */
- }
- break;
- }
- const auto guard_hEvent = gsl::finally([hEvent]() { EvtClose(hEvent); });
- logger_->log_trace("Succesfully get the next hEvent, performing event
rendering");
- EventRender eventRender;
- std::wstring newBookmarkXml;
- if (createEventRender(hEvent, eventRender) &&
pBookmark_->getNewBookmarkXml(hEvent, newBookmarkXml)) {
- bookmarkXml = std::move(newBookmarkXml);
- eventCount++;
- putEventRenderFlowFileToSession(eventRender, *session);
-
- if (batch_commit_size_ != 0U && (eventCount % batch_commit_size_ == 0)) {
- if (!commitAndSaveBookmark(bookmarkXml)) {
- context->yield();
- return;
- }
-
- commitAndSaveBookmarkCount = eventCount;
- }
- }
- }
-
- logger_->log_trace("Finish enumerating events.");
-
- if (eventCount > commitAndSaveBookmarkCount) {
- commitAndSaveBookmark(bookmarkXml);
+ if (processed_event_count > 0 && !commitAndSaveBookmark(bookmark_xml,
session)) {
Review comment:
I think we should yield if no events were processed, too. I know we
didn't before this change, but that seems like a bug.
```suggestion
if (processed_event_count == 0 || !commitAndSaveBookmark(bookmark_xml,
session)) {
```
##########
File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
##########
@@ -298,120 +298,108 @@ void ConsumeWindowsEventLog::onSchedule(const
std::shared_ptr<core::ProcessConte
logger_->log_trace("Successfully configured CWEL");
}
+bool ConsumeWindowsEventLog::commitAndSaveBookmark(const std::wstring
&bookmark_xml, const std::shared_ptr<core::ProcessSession> &session) {
+ {
+ const TimeDiff time_diff;
+ session->commit();
+ logger_->log_debug("processQueue commit took %" PRId64 " ms", time_diff());
+ }
+
+ if (!bookmark_->saveBookmarkXml(bookmark_xml)) {
+ logger_->log_error("Failed to save bookmark xml");
+ }
+
+ if (session->outgoingConnectionsFull("success")) {
+ logger_->log_debug("Outgoing success connection is full");
+ return false;
+ }
+
+ return true;
+}
+
+std::wstring ConsumeWindowsEventLog::processEventLogs(const
std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::ProcessSession> &session,
+ size_t& processed_event_count, const EVT_HANDLE& event_query_results) {
Review comment:
I would do this the other way round: return the number of events
processed, and use an out parameter for the bookmark XML. Not important, but I
think that is a more common pattern.
Another option would be to return a struct `{ size_t processed_event_count;
std::wstring bookmark_xml; }`.
##########
File path: extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp
##########
@@ -367,27 +372,19 @@ void batchCommitSizeTestHelper(int batch_commit_size, int
expected_num_commits)
test_plan->reset();
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
- {
- reportEvent(APPLICATION_CHANNEL, "Event one");
- reportEvent(APPLICATION_CHANNEL, "Event two");
- reportEvent(APPLICATION_CHANNEL, "Event three");
- reportEvent(APPLICATION_CHANNEL, "Event four");
- reportEvent(APPLICATION_CHANNEL, "Event five");
-
- test_controller.runSession(test_plan);
-
- REQUIRE(LogTestController::getInstance().countOccurrences("processQueue
commit") == expected_num_commits);
- }
+ std::vector<std::string> events{"Event one", "Event two", "Event three",
"Event four", "Event five"};
+ std::for_each(events.begin(), events.end(), [](const std::string& event){
reportEvent(APPLICATION_CHANNEL, event.c_str()); });
+ test_controller.runSession(test_plan);
+ auto expected_event_count = events.size() <= batch_commit_size ||
batch_commit_size == 0 ? events.size() : batch_commit_size;
+ REQUIRE(LogTestController::getInstance().contains("processed " +
std::to_string(expected_event_count) + " Events"));
}
} // namespace
TEST_CASE("ConsumeWindowsEventLog batch commit size works", "[onTrigger]") {
- batchCommitSizeTestHelper(1000, 1);
- batchCommitSizeTestHelper(5, 1);
- batchCommitSizeTestHelper(4, 2);
- batchCommitSizeTestHelper(3, 2);
- batchCommitSizeTestHelper(2, 3);
- batchCommitSizeTestHelper(1, 5);
- batchCommitSizeTestHelper(0, 1);
+ batchCommitSizeTestHelper(1000);
+ batchCommitSizeTestHelper(5);
+ batchCommitSizeTestHelper(4);
+ batchCommitSizeTestHelper(1);
+ batchCommitSizeTestHelper(0);
Review comment:
I think the test is clearer if you explicitly specify the expected
result, as before. Maybe the number of events could be a parameter, too, eg.
`batchCommitSizeTestHelper(int num_events_read, int batch_size, int
num_events_processed)`.
```suggestion
batchCommitSizeTestHelper(5, 1000, 5);
batchCommitSizeTestHelper(5, 5, 5);
batchCommitSizeTestHelper(5, 4, 4);
batchCommitSizeTestHelper(5, 1, 1);
batchCommitSizeTestHelper(5, 0, 5);
```
##########
File path: extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp
##########
@@ -335,21 +335,26 @@ TEST_CASE("ConsumeWindowsEventLog prints events in XML
correctly", "[onTrigger]"
test_controller.runSession(test_plan);
REQUIRE(LogTestController::getInstance().contains(R"(<Event
xmlns="http://schemas.microsoft.com/win/2004/08/events/event"><System><Provider
Name="Application"/>)"));
- REQUIRE(LogTestController::getInstance().contains(R"(<EventID
Qualifiers="0">14985</EventID><Level>4</Level><Task>0</Task><Keywords>0x80000000000000</Keywords><TimeCreated
SystemTime=")"));
+ REQUIRE(LogTestController::getInstance().contains(R"(<EventID
Qualifiers="0">14985</EventID>)"));
+ REQUIRE(LogTestController::getInstance().contains(R"(<Level>4</Level>)"));
+ REQUIRE(LogTestController::getInstance().contains(R"(<Task>0</Task>)"));
+
REQUIRE(LogTestController::getInstance().contains(R"(<Keywords>0x80000000000000</Keywords><TimeCreated
SystemTime=")"));
// the timestamp (when the event was published) goes here
REQUIRE(LogTestController::getInstance().contains(R"("/><EventRecordID>)"));
// the ID of the event goes here (a number)
-
REQUIRE(LogTestController::getInstance().contains(R"(</EventRecordID><Channel>Application</Channel><Computer>)"));
+ REQUIRE(LogTestController::getInstance().contains(R"(</EventRecordID>)"));
+
REQUIRE(LogTestController::getInstance().contains(R"(<Channel>Application</Channel><Computer>)"));
Review comment:
Why did you change this? This is now a weaker condition, allowing
additional elements, any order of the elements, multiple XML blocks each
containing some of these elements etc.
##########
File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
##########
@@ -298,120 +298,108 @@ void ConsumeWindowsEventLog::onSchedule(const
std::shared_ptr<core::ProcessConte
logger_->log_trace("Successfully configured CWEL");
}
+bool ConsumeWindowsEventLog::commitAndSaveBookmark(const std::wstring
&bookmark_xml, const std::shared_ptr<core::ProcessSession> &session) {
+ {
+ const TimeDiff time_diff;
+ session->commit();
+ logger_->log_debug("processQueue commit took %" PRId64 " ms", time_diff());
+ }
+
+ if (!bookmark_->saveBookmarkXml(bookmark_xml)) {
+ logger_->log_error("Failed to save bookmark xml");
+ }
+
+ if (session->outgoingConnectionsFull("success")) {
+ logger_->log_debug("Outgoing success connection is full");
+ return false;
+ }
+
+ return true;
+}
+
+std::wstring ConsumeWindowsEventLog::processEventLogs(const
std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::ProcessSession> &session,
+ size_t& processed_event_count, const EVT_HANDLE& event_query_results) {
Review comment:
could this be `const`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]