This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 5a9c1c33407e6a54b8de828b4620adeac659d814 Author: Ferenc Gerlits <[email protected]> AuthorDate: Fri Feb 17 01:11:26 2023 +0100 MINIFICPP-2034 Cache SID lookups in CWEL The SID -> Username lookup is a system call, which can be quite slow. As this mapping is unlikely to change without a Windows (and so MiNiFi) restart, we should cache it. Caching is quite simple: there is a hard-coded 24 hour cache expiry, the cache is in memory only, and the only way to clear it is to restart MiNiFi. We can improve this later if a user asks for it. Closes #1502 Signed-off-by: Marton Szasz <[email protected]> --- PROCESSORS.md | 1 + .../windows-event-log/ConsumeWindowsEventLog.cpp | 25 ++++++- .../windows-event-log/ConsumeWindowsEventLog.h | 7 +- extensions/windows-event-log/tests/CMakeLists.txt | 6 +- .../tests/ConsumeWindowsEventLogTests.cpp | 3 +- .../windows-event-log/tests/LookupCacherTests.cpp | 77 ++++++++++++++++++++++ .../tests/MetadataWalkerTests.cpp | 24 ++----- extensions/windows-event-log/wel/LookupCacher.cpp | 37 +++++++++++ extensions/windows-event-log/wel/LookupCacher.h | 48 ++++++++++++++ .../windows-event-log/wel/MetadataWalker.cpp | 5 +- extensions/windows-event-log/wel/MetadataWalker.h | 10 +-- 11 files changed, 211 insertions(+), 32 deletions(-) diff --git a/PROCESSORS.md b/PROCESSORS.md index 2e91293ee..a1878df3e 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -407,6 +407,7 @@ In the list below, the names of required properties appear in bold. Any other pr | Batch Commit Size | 1000 | | Maximum number of Events to consume and create to Flow Files from before committing. [...] | **Process Old Events** | false | true<br>false | This property defines if old events (which are created before first time server is started) should be processed. [...] | State Directory | CWELState | | DEPRECATED. Only use it for state migration from the state file, supplying the legacy state directory. [...] +| Cache SID Lookups | true | true<br>false | Determines whether SID to name lookups are cached in memory [...] ### Relationships diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp index fa48bf2ac..c1e93688a 100644 --- a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp +++ b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp @@ -33,6 +33,7 @@ #include <regex> #include <cinttypes> +#include "wel/LookupCacher.h" #include "wel/MetadataWalker.h" #include "wel/XMLString.h" #include "wel/UnicodeConversion.h" @@ -173,6 +174,13 @@ const core::Property ConsumeWindowsEventLog::ProcessOldEvents( withDescription("This property defines if old events (which are created before first time server is started) should be processed.")-> build()); +const core::Property ConsumeWindowsEventLog::CacheSidLookups( + core::PropertyBuilder::createProperty("Cache SID Lookups")-> + isRequired(false)-> + withDefaultValue<bool>(true)-> + withDescription("Determines whether SID to name lookups are cached in memory")-> + build()); + const core::Relationship ConsumeWindowsEventLog::Success("success", "Relationship for successfully consumed events."); ConsumeWindowsEventLog::ConsumeWindowsEventLog(const std::string& name, const utils::Identifier& uuid) @@ -324,6 +332,9 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte context->getProperty(MaxBufferSize.getName(), max_buffer_size_); logger_->log_debug("ConsumeWindowsEventLog: MaxBufferSize %" PRIu64, max_buffer_size_); + context->getProperty(CacheSidLookups.getName(), cache_sid_lookups_); + logger_->log_debug("ConsumeWindowsEventLog: will%s cache SID to name lookups", cache_sid_lookups_ ? "" : " not"); + provenanceUri_ = "winlog://" + computerName_ + "/" + channel_ + "?" + query; logger_->log_trace("Successfully configured CWEL"); } @@ -488,7 +499,7 @@ void ConsumeWindowsEventLog::substituteXMLPercentageItems(pugi::xml_document& do for (size_t numberPos = 0; std::string::npos != (numberPos = nodeText.find(percentages, numberPos));) { numberPos += percentages.size(); - uint64_t number{}; + DWORD number{}; try { // Assumption - first character is not '0', otherwise not all digits will be replaced by 'value'. number = std::stoul(&nodeText[numberPos]); @@ -593,7 +604,7 @@ nonstd::expected<EventRender, std::string> ConsumeWindowsEventLog::createEventRe // this is a well known path. std::string provider_name = doc.child("Event").child("System").child("Provider").attribute("Name").value(); wel::WindowsEventLogMetadataImpl metadata{getEventLogHandler(provider_name).getMetadata(), hEvent}; - wel::MetadataWalker walker{metadata, channel_, !resolve_as_attributes_, apply_identifier_function_, regex_ ? &*regex_ : nullptr}; + wel::MetadataWalker walker{metadata, channel_, !resolve_as_attributes_, apply_identifier_function_, regex_ ? &*regex_ : nullptr, userIdToUsernameFunction()}; // resolve the event metadata doc.traverse(walker); @@ -752,6 +763,16 @@ void ConsumeWindowsEventLog::LogWindowsError(const std::string& error) const { LocalFree(lpMsg); } +std::function<std::string(const std::string&)> ConsumeWindowsEventLog::userIdToUsernameFunction() const { + static constexpr auto lookup = &utils::OsUtils::userIdToUsername; + if (cache_sid_lookups_) { + static auto cached_lookup = wel::LookupCacher{lookup}; + return std::ref(cached_lookup); + } else { + return lookup; + } +} + REGISTER_RESOURCE(ConsumeWindowsEventLog, Processor); } // namespace org::apache::nifi::minifi::processors diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.h b/extensions/windows-event-log/ConsumeWindowsEventLog.h index 258846b1e..5a3a5b908 100644 --- a/extensions/windows-event-log/ConsumeWindowsEventLog.h +++ b/extensions/windows-event-log/ConsumeWindowsEventLog.h @@ -79,6 +79,7 @@ class ConsumeWindowsEventLog : public core::Processor { EXTENSIONAPI static const core::Property BatchCommitSize; EXTENSIONAPI static const core::Property BookmarkRootDirectory; EXTENSIONAPI static const core::Property ProcessOldEvents; + EXTENSIONAPI static const core::Property CacheSidLookups; static auto properties() { return std::array{ Channel, @@ -94,7 +95,8 @@ class ConsumeWindowsEventLog : public core::Processor { JSONFormat, BatchCommitSize, BookmarkRootDirectory, - ProcessOldEvents + ProcessOldEvents, + CacheSidLookups }; } @@ -121,6 +123,7 @@ class ConsumeWindowsEventLog : public core::Processor { void LogWindowsError(const std::string& error = "Error") const; nonstd::expected<EventRender, std::string> createEventRender(EVT_HANDLE eventHandle); void substituteXMLPercentageItems(pugi::xml_document& doc); + std::function<std::string(const std::string&)> userIdToUsernameFunction() const; nonstd::expected<std::string, std::string> renderEventAsXml(EVT_HANDLE event_handle); @@ -132,7 +135,6 @@ class ConsumeWindowsEventLog : public core::Processor { static constexpr const char* JSONSimple = "Simple"; static constexpr const char* JSONFlattened = "Flattened"; - private: struct TimeDiff { auto operator()() const { return int64_t{ std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - time_).count() }; @@ -163,6 +165,7 @@ class ConsumeWindowsEventLog : public core::Processor { uint64_t max_buffer_size_{}; std::map<std::string, wel::WindowsEventLogHandler> providers_; uint64_t batch_commit_size_{}; + bool cache_sid_lookups_ = true; SMART_ENUM(JSONType, (None, "None"), diff --git a/extensions/windows-event-log/tests/CMakeLists.txt b/extensions/windows-event-log/tests/CMakeLists.txt index b9fe53737..a88a3c212 100644 --- a/extensions/windows-event-log/tests/CMakeLists.txt +++ b/extensions/windows-event-log/tests/CMakeLists.txt @@ -17,17 +17,17 @@ # under the License. # -set(WEL_INTEGRATION_TESTS "BookmarkTests.cpp" "ConsumeWindowsEventLogTests.cpp" "MetadataWalkerTests.cpp") +set(WEL_TESTS "BookmarkTests.cpp" "ConsumeWindowsEventLogTests.cpp" "LookupCacherTests.cpp" "MetadataWalkerTests.cpp") if (TEST_CUSTOM_WEL_PROVIDER) execute_process(COMMAND "${CMAKE_CURRENT_LIST_DIR}/custom-provider/generate-and-register.bat" "${CMAKE_CURRENT_LIST_DIR}/custom-provider" ) - list(APPEND WEL_INTEGRATION_TESTS "CWELCustomProviderTests.cpp") + list(APPEND WEL_TESTS "CWELCustomProviderTests.cpp") endif() SET(WEL_TEST_COUNT 0) -FOREACH(testfile ${WEL_INTEGRATION_TESTS}) +FOREACH(testfile ${WEL_TESTS}) get_filename_component(testfilename "${testfile}" NAME_WE) add_executable("${testfilename}" "${testfile}") target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/windows-event-log/") diff --git a/extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp b/extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp index c5f9d052d..44a2f125f 100644 --- a/extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp +++ b/extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp @@ -97,7 +97,8 @@ TEST_CASE("ConsumeWindowsEventLog properties work with default values", "[create ConsumeWindowsEventLog::OutputFormat, ConsumeWindowsEventLog::BatchCommitSize, ConsumeWindowsEventLog::BookmarkRootDirectory, // TODO(fgerlits): obsolete, see definition; remove in a later release - ConsumeWindowsEventLog::ProcessOldEvents + ConsumeWindowsEventLog::ProcessOldEvents, + ConsumeWindowsEventLog::CacheSidLookups }; for (const core::Property& property : properties_required_or_with_default_value) { if (!LogTestController::getInstance().contains("property name " + property.getName() + " value ")) { diff --git a/extensions/windows-event-log/tests/LookupCacherTests.cpp b/extensions/windows-event-log/tests/LookupCacherTests.cpp new file mode 100644 index 000000000..1bcd41058 --- /dev/null +++ b/extensions/windows-event-log/tests/LookupCacherTests.cpp @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <thread> + +#include "Catch.h" +#include "wel/LookupCacher.h" + +namespace wel = org::apache::nifi::minifi::wel; + +namespace { +struct DoubleTheInput { + std::string operator()(const std::string& key) { + keys_queried.push_back(key); + return key + key; + } + + std::vector<std::string> keys_queried; +}; +} + +TEST_CASE("LookupCacher can do lookups") { + DoubleTheInput lookup; + wel::LookupCacher lookup_cacher{std::ref(lookup)}; + + CHECK(lookup_cacher("foo") == "foofoo"); + CHECK(lookup_cacher("bar") == "barbar"); + CHECK(lookup_cacher("baa") == "baabaa"); + CHECK(lookup.keys_queried == std::vector<std::string>{"foo", "bar", "baa"}); +} + +TEST_CASE("LookupCacher caches the lookups") { + DoubleTheInput lookup; + wel::LookupCacher lookup_cacher{std::ref(lookup)}; + CHECK(lookup.keys_queried.empty()); + + lookup_cacher("foo"); + CHECK(lookup.keys_queried.size() == 1); + + lookup_cacher("foo"); + CHECK(lookup.keys_queried.size() == 1); + + lookup_cacher("foo"); + CHECK(lookup.keys_queried.size() == 1); +} + +TEST_CASE("LookupCacher lookups can expire") { + using namespace std::literals::chrono_literals; + DoubleTheInput lookup; + wel::LookupCacher lookup_cacher{std::ref(lookup), 10ms}; + CHECK(lookup.keys_queried.empty()); + + lookup_cacher("foo"); + CHECK(lookup.keys_queried.size() == 1); + + lookup_cacher("foo"); + CHECK(lookup.keys_queried.size() == 1); + + std::this_thread::sleep_for(20ms); + + lookup_cacher("foo"); + CHECK(lookup.keys_queried.size() == 2); +} diff --git a/extensions/windows-event-log/tests/MetadataWalkerTests.cpp b/extensions/windows-event-log/tests/MetadataWalkerTests.cpp index ee0339cfe..4dec8a544 100644 --- a/extensions/windows-event-log/tests/MetadataWalkerTests.cpp +++ b/extensions/windows-event-log/tests/MetadataWalkerTests.cpp @@ -22,6 +22,7 @@ #include "TestBase.h" #include "Catch.h" #include "core/Core.h" +#include "utils/OsUtils.h" #include "wel/MetadataWalker.h" #include "wel/XMLString.h" #include "pugixml.hpp" @@ -36,7 +37,7 @@ namespace { std::string updateXmlMetadata(const std::string &xml, EVT_HANDLE metadata_ptr, EVT_HANDLE event_ptr, bool update_xml, bool resolve, utils::Regex const* regex = nullptr) { WindowsEventLogMetadataImpl metadata{metadata_ptr, event_ptr}; - MetadataWalker walker(metadata, "", update_xml, resolve, regex); + MetadataWalker walker(metadata, "", update_xml, resolve, regex, &utils::OsUtils::userIdToUsername); pugi::xml_document doc; pugi::xml_parse_result result = doc.load_string(xml.c_str()); @@ -118,24 +119,13 @@ TEST_CASE("MetadataWalker will leave a Sid unchanged if it doesn't correspond to TEST_CASE("MetadataWalker can replace multiple Sids", "[updateXmlMetadata]") { std::string xml = readFile("resources/multiplesids.xml"); - std::string programmaticallyResolved; - pugi::xml_document doc; xml = updateXmlMetadata(xml, nullptr, nullptr, false, true); pugi::xml_parse_result result = doc.load_string(xml.c_str()); + REQUIRE(result); - for (const auto &node : doc.child("Event").child("EventData").children()) { - auto name = node.attribute("Name").as_string(); - if (utils::StringUtils::equalsIgnoreCase("GroupMembership", name)) { - programmaticallyResolved = node.text().get(); - break; - } - } - - std::string expected = "Nobody Everyone Null Authority"; - - // we are only testing mulitiple sid resolutions, not the resolution of other items. - REQUIRE(expected == programmaticallyResolved); + // we are only testing multiple sid resolutions, not the resolution of other items. + CHECK(std::string_view("Nobody Everyone Null Authority") == doc.select_node("Event/EventData/Data[@Name='GroupMembership']").node().text().get()); } namespace { @@ -150,10 +140,10 @@ void extractMappingsTestHelper(const std::string &file_name, REQUIRE(!input_xml.empty()); pugi::xml_document doc; pugi::xml_parse_result result = doc.load_string(input_xml.c_str()); - CHECK(result); + REQUIRE(result); auto regex = utils::Regex(".*Sid"); - MetadataWalker walker(FakeWindowsEventLogMetadata{}, METADATA_WALKER_TESTS_LOG_NAME, update_xml, resolve, ®ex); + MetadataWalker walker(FakeWindowsEventLogMetadata{}, METADATA_WALKER_TESTS_LOG_NAME, update_xml, resolve, ®ex, &utils::OsUtils::userIdToUsername); doc.traverse(walker); CHECK(walker.getIdentifiers() == expected_identifiers); diff --git a/extensions/windows-event-log/wel/LookupCacher.cpp b/extensions/windows-event-log/wel/LookupCacher.cpp new file mode 100644 index 000000000..fdb240a8a --- /dev/null +++ b/extensions/windows-event-log/wel/LookupCacher.cpp @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "LookupCacher.h" + +namespace org::apache::nifi::minifi::wel { + +std::string LookupCacher::operator()(const std::string& key) { + { + std::lock_guard<std::mutex> lock{mutex_}; + const auto it = cache_.find(key); + if (it != cache_.end() && it->second.expiry > std::chrono::system_clock::now()) { + return it->second.value; + } + } + + std::string value = lookup_function_(key); + + std::lock_guard<std::mutex> lock{mutex_}; + cache_.insert_or_assign(key, CacheItem{value, std::chrono::system_clock::now() + lifetime_}); + return value; +} + +} // namespace org::apache::nifi::minifi::wel diff --git a/extensions/windows-event-log/wel/LookupCacher.h b/extensions/windows-event-log/wel/LookupCacher.h new file mode 100644 index 000000000..7122c865c --- /dev/null +++ b/extensions/windows-event-log/wel/LookupCacher.h @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <chrono> +#include <functional> +#include <mutex> +#include <string> +#include <unordered_map> +#include <utility> + +namespace org::apache::nifi::minifi::wel { + +class LookupCacher { + public: + explicit LookupCacher(std::function<std::string(const std::string&)> lookup_function, std::chrono::milliseconds lifetime = std::chrono::hours{24}) + : lookup_function_(std::move(lookup_function)), + lifetime_(lifetime) {} + std::string operator()(const std::string& key); + + private: + struct CacheItem { + std::string value; + std::chrono::system_clock::time_point expiry; + }; + + std::mutex mutex_; + std::function<std::string(const std::string&)> lookup_function_; + std::chrono::milliseconds lifetime_; + std::unordered_map<std::string, CacheItem> cache_; +}; + +} // namespace org::apache::nifi::minifi::wel diff --git a/extensions/windows-event-log/wel/MetadataWalker.cpp b/extensions/windows-event-log/wel/MetadataWalker.cpp index 8b5ca7c5e..2181db17c 100644 --- a/extensions/windows-event-log/wel/MetadataWalker.cpp +++ b/extensions/windows-event-log/wel/MetadataWalker.cpp @@ -16,7 +16,6 @@ * limitations under the License. */ -#include <windows.h> #include <strsafe.h> #include <map> @@ -39,7 +38,7 @@ bool MetadataWalker::for_each(pugi::xml_node &node) { for (pugi::xml_attribute attr : node.attributes()) { const auto idUpdate = [&](const std::string &input) { if (resolve_) { - auto resolved = utils::OsUtils::userIdToUsername(input); + auto resolved = user_id_to_username_fn_(input); replaced_identifiers_[input] = resolved; return resolved; } @@ -61,7 +60,7 @@ bool MetadataWalker::for_each(pugi::xml_node &node) { std::string nodeText = node.text().get(); std::vector<std::string> ids = getIdentifiers(nodeText); for (const auto &id : ids) { - auto resolved = utils::OsUtils::userIdToUsername(id); + auto resolved = user_id_to_username_fn_(id); std::string replacement = "%{" + id + "}"; replaced_identifiers_[id] = resolved; replaced_identifiers_[replacement] = resolved; diff --git a/extensions/windows-event-log/wel/MetadataWalker.h b/extensions/windows-event-log/wel/MetadataWalker.h index 014dadbdb..a2d949338 100644 --- a/extensions/windows-event-log/wel/MetadataWalker.h +++ b/extensions/windows-event-log/wel/MetadataWalker.h @@ -23,7 +23,7 @@ #include <Windows.h> #include <winevt.h> #include <codecvt> - +#include <functional> #include <map> #include <sstream> #include <string> @@ -34,7 +34,6 @@ #include "core/Core.h" #include "core/Processor.h" #include "core/ProcessSession.h" -#include "utils/OsUtils.h" #include "FlowFileRecord.h" #include "WindowsEventLog.h" @@ -50,12 +49,14 @@ namespace org::apache::nifi::minifi::wel { */ class MetadataWalker : public pugi::xml_tree_walker { public: - MetadataWalker(const WindowsEventLogMetadata& windows_event_log_metadata, std::string log_name, bool update_xml, bool resolve, utils::Regex const* regex) + MetadataWalker(const WindowsEventLogMetadata& windows_event_log_metadata, std::string log_name, bool update_xml, bool resolve, utils::Regex const* regex, + std::function<std::string(std::string)> user_id_to_username_fn) : windows_event_log_metadata_(windows_event_log_metadata), log_name_(std::move(log_name)), regex_(regex), update_xml_(update_xml), - resolve_(resolve) { + resolve_(resolve), + user_id_to_username_fn_(std::move(user_id_to_username_fn)) { } /** @@ -93,6 +94,7 @@ class MetadataWalker : public pugi::xml_tree_walker { utils::Regex const * const regex_; const bool update_xml_; const bool resolve_; + std::function<std::string(const std::string&)> user_id_to_username_fn_; std::map<std::string, std::string> metadata_; std::map<std::string, std::string> fields_values_; std::map<std::string, std::string> replaced_identifiers_;
