This is an automated email from the ASF dual-hosted git repository.
phrocker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new b635f33 MINIFICPP-773 Implemented.
b635f33 is described below
commit b635f33931a7367820517e584ebb1bc5ca8c56b7
Author: amarmer <amarmer@AMARMER-5530-85>
AuthorDate: Wed May 8 15:51:41 2019 -0700
MINIFICPP-773 Implemented.
MINIFICPP-773 Fixed EvtSubscribe parameter.
MINIFICPP-773 Fixed formatting.
MINIFICPP-773 Code review modifications.
This closes #551.
Signed-off-by: Marc Parisi <[email protected]>
---
.../windows-event-log/ConsumeWindowsEventLog.cpp | 306 +++++++++++++++++++++
.../windows-event-log/ConsumeWindowsEventLog.h | 102 +++++++
2 files changed, 408 insertions(+)
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
new file mode 100644
index 0000000..afc6d87
--- /dev/null
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
@@ -0,0 +1,306 @@
+/**
+ * @file ConsumeWindowsEventLog.cpp
+ * ConsumeWindowsEventLog class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConsumeWindowsEventLog.h"
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <memory>
+#include <codecvt>
+
+#include "io/DataStream.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+#pragma comment(lib, "wevtapi.lib")
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const std::string
ConsumeWindowsEventLog::ProcessorName("ConsumeWindowsEventLog");
+
+core::Property ConsumeWindowsEventLog::Channel(
+ core::PropertyBuilder::createProperty("Channel")->
+ isRequired(true)->
+ withDefaultValue("System")->
+ withDescription("The Windows Event Log Channel to listen to.")->
+ supportsExpressionLanguage(true)->
+ build());
+
+core::Property ConsumeWindowsEventLog::Query(
+ core::PropertyBuilder::createProperty("Query")->
+ isRequired(true)->
+ withDefaultValue("*")->
+ withDescription("XPath Query to filter events. (See
https://msdn.microsoft.com/en-us/library/windows/desktop/dd996910(v=vs.85).aspx
for examples.)")->
+ supportsExpressionLanguage(true)->
+ build());
+
+core::Property ConsumeWindowsEventLog::MaxBufferSize(
+ core::PropertyBuilder::createProperty("Max Buffer Size")->
+ isRequired(true)->
+ withDefaultValue<core::DataSizeValue>("1 MB")->
+ withDescription(
+ "The individual Event Log XMLs are rendered to a buffer."
+ " This specifies the maximum size in bytes that the buffer will be allowed
to grow to. (Limiting the maximum size of an individual Event XML.)")->
+ build());
+
+core::Property ConsumeWindowsEventLog::InactiveDurationToReconnect(
+ core::PropertyBuilder::createProperty("Inactive Duration To Reconnect")->
+ isRequired(true)->
+ withDefaultValue<core::TimePeriodValue>("10 min")->
+ withDescription(
+ "If no new event logs are processed for the specified time period, "
+ " this processor will try reconnecting to recover from a state where any
further messages cannot be consumed."
+ " Such situation can happen if Windows Event Log service is restarted, or
ERROR_EVT_QUERY_RESULT_STALE (15011) is returned."
+ " Setting no duration, e.g. '0 ms' disables auto-reconnection.")->
+ build());
+
+core::Relationship ConsumeWindowsEventLog::Success("success", "Relationship
for successfully consumed events.");
+
+ConsumeWindowsEventLog::ConsumeWindowsEventLog(const std::string& name,
utils::Identifier uuid)
+ : core::Processor(name, uuid),
logger_(logging::LoggerFactory<ConsumeWindowsEventLog>::getLogger()) {
+ char buff[MAX_COMPUTERNAME_LENGTH + 1];
+ DWORD size = sizeof(buff);
+ if (GetComputerName(buff, &size)) {
+ computerName_ = buff;
+ } else {
+ LogWindowsError();
+ }
+}
+
+void ConsumeWindowsEventLog::initialize() {
+ //! Set the supported properties
+ setSupportedProperties({Channel, Query, MaxBufferSize,
InactiveDurationToReconnect});
+
+ //! Set the supported relationships
+ setSupportedRelationships({Success});
+}
+
+void ConsumeWindowsEventLog::onSchedule(const
std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+ if (subscriptionHandle_) {
+ logger_->log_error("Processor already subscribed to Event Log, expected
cleanup to unsubscribe.");
+ } else {
+ sessionFactory_ = sessionFactory;
+
+ subscribe(context);
+ }
+}
+
+void ConsumeWindowsEventLog::onTrigger(const
std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::ProcessSession> &session) {
+ if (!subscriptionHandle_) {
+ if (!subscribe(context)) {
+ context->yield();
+ return;
+ }
+ }
+
+ const auto flowFileCount = processQueue(session);
+
+ const auto now = GetTickCount();
+
+ if (flowFileCount > 0) {
+ lastActivityTimestamp_ = now;
+ }
+ else if (inactiveDurationToReconnect_ > 0) {
+ if ((now - lastActivityTimestamp_) > inactiveDurationToReconnect_) {
+ logger_->log_info("Exceeds configured 'inactive duration to reconnect'
%lld ms. Unsubscribe to reconnect..", inactiveDurationToReconnect_);
+ unsubscribe();
+ }
+ }
+}
+
+bool ConsumeWindowsEventLog::subscribe(const
std::shared_ptr<core::ProcessContext> &context)
+{
+ std::string channel;
+ context->getProperty(Channel.getName(), channel);
+
+ std::string query;
+ context->getProperty(Query.getName(), query);
+
+ context->getProperty(MaxBufferSize.getName(), maxBufferSize_);
+ logger_->log_debug("ConsumeWindowsEventLog: maxBufferSize_ %lld",
maxBufferSize_);
+
+ provenanceUri_ = "winlog://" + computerName_ + "/" + channel + "?" + query;
+
+ std::string strInactiveDurationToReconnect;
+ context->getProperty(InactiveDurationToReconnect.getName(),
strInactiveDurationToReconnect);
+
+ // Get 'inactiveDurationToReconnect_'.
+ core::TimeUnit unit;
+ if (core::Property::StringToTime(strInactiveDurationToReconnect,
inactiveDurationToReconnect_, unit) &&
+ core::Property::ConvertTimeUnitToMS(inactiveDurationToReconnect_, unit,
inactiveDurationToReconnect_)) {
+ logger_->log_info("inactiveDurationToReconnect: [%lld] ms",
inactiveDurationToReconnect_);
+ }
+
+ subscriptionHandle_ = EvtSubscribe(
+ NULL,
+ NULL,
+ std::wstring(channel.begin(), channel.end()).c_str(),
+ std::wstring(query.begin(), query.end()).c_str(),
+ NULL,
+ this,
+ [](EVT_SUBSCRIBE_NOTIFY_ACTION action, PVOID pContext, EVT_HANDLE hEvent)
+ {
+ auto pConsumeWindowsEventLog =
static_cast<ConsumeWindowsEventLog*>(pContext);
+
+ auto& logger = pConsumeWindowsEventLog->logger_;
+
+ if (action == EvtSubscribeActionError) {
+ if (ERROR_EVT_QUERY_RESULT_STALE == (DWORD)hEvent) {
+ logger->log_error("Received missing event notification. Consider
triggering processor more frequently or increasing queue size.");
+ } else {
+ logger->log_error("Received the following Win32 error: %x",
hEvent);
+ }
+ } else if (action == EvtSubscribeActionDeliver) {
+ DWORD size = 0;
+ DWORD used = 0;
+ DWORD propertyCount = 0;
+
+ if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used,
&propertyCount)) {
+ if (ERROR_INSUFFICIENT_BUFFER == GetLastError()) {
+ if (used > pConsumeWindowsEventLog->maxBufferSize_) {
+ logger->log_error("Dropping event %x because it couldn't be
rendered within %ll bytes.", hEvent, pConsumeWindowsEventLog->maxBufferSize_);
+ return 0UL;
+ }
+
+ size = used;
+ std::vector<char> buf(size);
+ if (EvtRender(NULL, hEvent, EvtRenderEventXml, size, &buf[0],
&used, &propertyCount)) {
+ std::string xml =
std::wstring_convert<std::codecvt_utf8<wchar_t>>().to_bytes(reinterpret_cast<wchar_t*>(&buf[0]));
+
+ pConsumeWindowsEventLog->renderedXMLs_.enqueue(std::move(xml));
+ } else {
+ logger->log_error("EvtRender returned the following error
code: %d.", GetLastError());
+ }
+ }
+ }
+ }
+
+ return 0UL;
+ },
+ EvtSubscribeToFutureEvents | EvtSubscribeStrict);
+
+ if (!subscriptionHandle_) {
+ logger_->log_error("Unable to subscribe with provided parameters, received
the following error code: %d", GetLastError());
+ return false;
+ }
+
+ lastActivityTimestamp_ = GetTickCount();
+
+ return true;
+}
+
+void ConsumeWindowsEventLog::unsubscribe()
+{
+ if (subscriptionHandle_) {
+ EvtClose(subscriptionHandle_);
+ subscriptionHandle_ = 0;
+ }
+}
+
+int ConsumeWindowsEventLog::processQueue(const
std::shared_ptr<core::ProcessSession> &session)
+{
+ struct WriteCallback: public OutputStreamCallback {
+ WriteCallback(const std::string& str)
+ : str_(str) {
+ status_ = 0;
+ }
+
+ int64_t process(std::shared_ptr<io::BaseStream> stream) {
+ auto len = stream->writeData((uint8_t*)&str_[0], str_.size());
+ if (len < 0)
+ status_ = -1;
+ return len;
+ }
+
+ std::string str_;
+ int status_;
+ };
+
+ int flowFileCount = 0;
+
+ std::string xml;
+ while (renderedXMLs_.try_dequeue(xml)) {
+ auto flowFile = session->create();
+
+ session->write(flowFile, &WriteCallback(xml));
+ session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE),
"application/xml");
+ session->getProvenanceReporter()->receive(flowFile, provenanceUri_,
getUUIDStr(), "Consume windows event logs", 0);
+ session->transfer(flowFile, Success);
+ session->commit();
+
+ flowFileCount++;
+ }
+
+ return flowFileCount;
+}
+
+void ConsumeWindowsEventLog::notifyStop()
+{
+ unsubscribe();
+
+ if (renderedXMLs_.size_approx() != 0) {
+ auto session = sessionFactory_->createSession();
+ if (session) {
+ logger_->log_info("Finishing processing leftover events");
+
+ processQueue(session);
+ } else {
+ logger_->log_error(
+ "Stopping the processor but there is no ProcessSessionFactory stored
and there are messages in the internal queue. "
+ "Removing the processor now will clear the queue but will result in
DATA LOSS. This is normally due to starting the processor, "
+ "receiving events and stopping before the onTrigger happens. The
messages in the internal queue cannot finish processing until "
+ "the processor is triggered to run.");
+ }
+ }
+}
+
+void ConsumeWindowsEventLog::LogWindowsError()
+{
+ auto error_id = GetLastError();
+ LPVOID lpMsg;
+
+ FormatMessage(
+ FORMAT_MESSAGE_ALLOCATE_BUFFER |
+ FORMAT_MESSAGE_FROM_SYSTEM,
+ NULL,
+ error_id,
+ MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
+ (LPTSTR)&lpMsg,
+ 0, NULL);
+
+ logger_->log_error("Error %d: %s\n", (int)error_id, (char *)lpMsg);
+
+ LocalFree(lpMsg);
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.h
b/extensions/windows-event-log/ConsumeWindowsEventLog.h
new file mode 100644
index 0000000..4a00245
--- /dev/null
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.h
@@ -0,0 +1,102 @@
+/**
+ * @file ConsumeWindowsEventLog.h
+ * ConsumeWindowsEventLog class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "core/Core.h"
+#include "FlowFileRecord.h"
+#include "concurrentqueue.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include <winevt.h>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+//! ConsumeWindowsEventLog Class
+class ConsumeWindowsEventLog : public core::Processor
+{
+public:
+ //! Constructor
+ /*!
+ * Create a new processor
+ */
+ ConsumeWindowsEventLog(const std::string& name, utils::Identifier uuid =
utils::Identifier());
+
+ //! Destructor
+ virtual ~ConsumeWindowsEventLog()
+ {
+ }
+
+ //! Processor Name
+ static const std::string ProcessorName;
+
+ //! Supported Properties
+ static core::Property Channel;
+ static core::Property Query;
+ static core::Property MaxBufferSize;
+ static core::Property InactiveDurationToReconnect;
+
+ //! Supported Relationships
+ static core::Relationship Success;
+
+public:
+ /**
+ * Function that's executed when the processor is scheduled.
+ * @param context process context.
+ * @param sessionFactory process session factory that is used when creating
+ * ProcessSession objects.
+ */
+ void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
+ //! OnTrigger method, implemented by NiFi ConsumeWindowsEventLog
+ virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context,
const std::shared_ptr<core::ProcessSession> &session) override;
+ //! Initialize, overwrite by NiFi ConsumeWindowsEventLog
+ virtual void initialize(void) override;
+ virtual void notifyStop() override;
+
+protected:
+ bool subscribe(const std::shared_ptr<core::ProcessContext> &context);
+ void unsubscribe();
+ int processQueue(const std::shared_ptr<core::ProcessSession> &session);
+
+ void LogWindowsError();
+private:
+ // Logger
+ std::shared_ptr<logging::Logger> logger_;
+ moodycamel::ConcurrentQueue<std::string> renderedXMLs_;
+ std::string provenanceUri_;
+ std::string computerName_;
+ int64_t inactiveDurationToReconnect_{};
+ EVT_HANDLE subscriptionHandle_{};
+ uint64_t maxBufferSize_{};
+ DWORD lastActivityTimestamp_{};
+ std::shared_ptr<core::ProcessSessionFactory> sessionFactory_;
+};
+
+REGISTER_RESOURCE(ConsumeWindowsEventLog, "Windows Event Log Subscribe
Callback to receive FlowFiles from Events on Windows.");
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */