This is an automated email from the ASF dual-hosted git repository.
phrocker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new bcf8921 MINIFICPP-287: Add property for flow files to log
bcf8921 is described below
commit bcf8921b839c967a31c764ff29027b320c462634
Author: Marc Parisi <[email protected]>
AuthorDate: Tue May 14 15:26:05 2019 -0400
MINIFICPP-287: Add property for flow files to log
This closes #553.
Approved on GH By jdye64.
Signed-off-by: Marc Parisi <[email protected]>
---
.../processors/LogAttribute.cpp | 165 +++++++++++----------
.../standard-processors/processors/LogAttribute.h | 10 +-
.../tests/unit/ProcessorTests.cpp | 91 ++++++++++++
libminifi/include/core/PropertyValidation.h | 5 +
libminifi/include/core/state/Value.h | 13 ++
libminifi/src/core/PropertyValidation.cpp | 2 +
6 files changed, 208 insertions(+), 78 deletions(-)
diff --git a/extensions/standard-processors/processors/LogAttribute.cpp
b/extensions/standard-processors/processors/LogAttribute.cpp
index 678d12c..a02bafe 100644
--- a/extensions/standard-processors/processors/LogAttribute.cpp
+++ b/extensions/standard-processors/processors/LogAttribute.cpp
@@ -39,12 +39,17 @@ namespace nifi {
namespace minifi {
namespace processors {
-core::Property
LogAttribute::LogLevel(core::PropertyBuilder::createProperty("Log
Level")->withDescription("The Log Level to use when logging the
Attributes")->withAllowableValues<std::string>({
+core::Property
LogAttribute::LogLevel(core::PropertyBuilder::createProperty("Log
Level")->withDescription("The Log Level to use when logging the
Attributes")->withAllowableValues<std::string>( {
"info", "trace", "error", "warn", "debug" })->build());
core::Property LogAttribute::AttributesToLog(
core::PropertyBuilder::createProperty("Attributes to
Log")->withDescription("A comma-separated list of Attributes to Log. If not
specified, all attributes will be logged.")->build());
+core::Property LogAttribute::FlowFilesToLog(
+ core::PropertyBuilder::createProperty("FlowFiles To Log")->withDescription(
+ "Number of flow files to log. If set to zero all flow files will be
logged. Please note that this may block other threads from running if not used
judiciously.")->withDefaultValue<uint64_t>(1)
+ ->build());
+
core::Property LogAttribute::AttributesToIgnore(
core::PropertyBuilder::createProperty("Attributes to
Ignore")->withDescription("A comma-separated list of Attributes to ignore. If
not specified, no attributes will be ignored.")->build());
@@ -63,6 +68,7 @@ void LogAttribute::initialize() {
properties.insert(AttributesToLog);
properties.insert(AttributesToIgnore);
properties.insert(LogPayload);
+ properties.insert(FlowFilesToLog);
properties.insert(LogPrefix);
setSupportedProperties(properties);
// Set the supported relationships
@@ -71,91 +77,100 @@ void LogAttribute::initialize() {
setSupportedRelationships(relationships);
}
-void LogAttribute::onTrigger(core::ProcessContext *context,
core::ProcessSession *session) {
- logger_->log_trace("enter log attribute");
+void LogAttribute::onSchedule(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+ core::Property flowsToLog = FlowFilesToLog;
+
+ if (getProperty(FlowFilesToLog.getName(), flowsToLog)) {
+ // we are going this route since to avoid breaking backwards compatibility
the get property function doesn't perform validation ( That's done
+ // in configuration. In future releases we can add that exception handling
there.
+ if (!flowsToLog.getValue().validate("Validating FlowFilesToLog").valid())
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid value for flowfiles
to log: " + flowsToLog.getValue().to_string());
+ flowfiles_to_log_ = flowsToLog.getValue();
+ }
+}
+// OnTrigger method, implemented by NiFi LogAttribute
+void LogAttribute::onTrigger(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::ProcessSession> &session) {
+ logger_->log_trace("enter log attribute, attempting to retrieve %u flow
files", flowfiles_to_log_);
std::string dashLine = "--------------------------------------------------";
LogAttrLevel level = LogAttrLevelInfo;
bool logPayload = false;
std::ostringstream message;
- std::shared_ptr<core::FlowFile> flow = session->get();
+ uint64_t i = 0;
+ const auto max = flowfiles_to_log_ == 0 ? UINT64_MAX : flowfiles_to_log_;
+ for (; i < max; ++i) {
+ std::shared_ptr<core::FlowFile> flow = session->get();
- if (!flow) {
- return;
- }
+ if (!flow) {
+ break;
+ }
- std::string value;
- if (context->getProperty(LogLevel.getName(), value)) {
- logLevelStringToEnum(value, level);
- }
- if (context->getProperty(LogPrefix.getName(), value)) {
- dashLine = "-----" + value + "-----";
- }
+ std::string value;
+ if (context->getProperty(LogLevel.getName(), value)) {
+ logLevelStringToEnum(value, level);
+ }
+ if (context->getProperty(LogPrefix.getName(), value)) {
+ dashLine = "-----" + value + "-----";
+ }
- context->getProperty(LogPayload.getName(), logPayload);
-
- message << "Logging for flow file " << "\n";
- message << dashLine;
- message << "\nStandard FlowFile Attributes";
- message << "\n" << "UUID:" << flow->getUUIDStr();
- message << "\n" << "EntryDate:" << getTimeStr(flow->getEntryDate());
- message << "\n" << "lineageStartDate:" <<
getTimeStr(flow->getlineageStartDate());
- message << "\n" << "Size:" << flow->getSize() << " Offset:" <<
flow->getOffset();
- message << "\nFlowFile Attributes Map Content";
- std::map<std::string, std::string> attrs = flow->getAttributes();
- std::map<std::string, std::string>::iterator it;
- for (it = attrs.begin(); it != attrs.end(); it++) {
- message << "\n" << "key:" << it->first << " value:" << it->second;
- }
- message << "\nFlowFile Resource Claim Content";
- std::shared_ptr<ResourceClaim> claim = flow->getResourceClaim();
- if (claim) {
- message << "\n" << "Content Claim:" << claim->getContentFullPath();
- }
- if (logPayload && flow->getSize() <= 1024 * 1024) {
- message << "\n" << "Payload:" << "\n";
- ReadCallback callback(flow->getSize());
- session->read(flow, &callback);
- for (unsigned int i = 0, j = 0; i < callback.read_size_; i++) {
- message << std::hex << callback.buffer_[i];
- j++;
- if (j == 80) {
- message << '\n';
- j = 0;
+ context->getProperty(LogPayload.getName(), logPayload);
+
+ message << "Logging for flow file " << "\n";
+ message << dashLine;
+ message << "\nStandard FlowFile Attributes";
+ message << "\n" << "UUID:" << flow->getUUIDStr();
+ message << "\n" << "EntryDate:" << getTimeStr(flow->getEntryDate());
+ message << "\n" << "lineageStartDate:" <<
getTimeStr(flow->getlineageStartDate());
+ message << "\n" << "Size:" << flow->getSize() << " Offset:" <<
flow->getOffset();
+ message << "\nFlowFile Attributes Map Content";
+ std::map<std::string, std::string> attrs = flow->getAttributes();
+ std::map<std::string, std::string>::iterator it;
+ for (it = attrs.begin(); it != attrs.end(); it++) {
+ message << "\n" << "key:" << it->first << " value:" << it->second;
+ }
+ message << "\nFlowFile Resource Claim Content";
+ std::shared_ptr<ResourceClaim> claim = flow->getResourceClaim();
+ if (claim) {
+ message << "\n" << "Content Claim:" << claim->getContentFullPath();
+ }
+ if (logPayload && flow->getSize() <= 1024 * 1024) {
+ message << "\n" << "Payload:" << "\n";
+ ReadCallback callback(flow->getSize());
+ session->read(flow, &callback);
+ for (unsigned int i = 0, j = 0; i < callback.read_size_; i++) {
+ message << std::hex << callback.buffer_[i];
+ j++;
+ if (j == 80) {
+ message << '\n';
+ j = 0;
+ }
}
}
+ message << "\n" << dashLine << std::ends;
+ std::string output = message.str();
+
+ switch (level) {
+ case LogAttrLevelInfo:
+ logger_->log_info("%s", output);
+ break;
+ case LogAttrLevelDebug:
+ logger_->log_debug("%s", output);
+ break;
+ case LogAttrLevelError:
+ logger_->log_error("%s", output);
+ break;
+ case LogAttrLevelTrace:
+ logger_->log_trace("%s", output);
+ break;
+ case LogAttrLevelWarn:
+ logger_->log_warn("%s", output);
+ break;
+ default:
+ break;
+ }
+ session->transfer(flow, Success);
}
- message << "\n" << dashLine << std::ends;
- std::string output = message.str();
-
- switch (level) {
- case LogAttrLevelInfo:
- logger_->log_info("%s", output);
- break;
- case LogAttrLevelDebug:
- logger_->log_debug("%s", output);
- break;
- case LogAttrLevelError:
- logger_->log_error("%s", output);
- break;
- case LogAttrLevelTrace:
- logger_->log_trace("%s", output);
- break;
- case LogAttrLevelWarn:
- logger_->log_warn("%s", output);
- break;
- default:
- break;
- }
-
- // Test Import
- /*
- std::shared_ptr<FlowFileRecord> importRecord = session->create();
- session->import(claim->getContentFullPath(), importRecord);
- session->transfer(importRecord, Success); */
-
- // Transfer to the relationship
- session->transfer(flow, Success);
+ logger_->log_debug("Logged %d flow files", i);
}
} /* namespace processors */
diff --git a/extensions/standard-processors/processors/LogAttribute.h
b/extensions/standard-processors/processors/LogAttribute.h
index 1c5b2e4..5d03e11 100644
--- a/extensions/standard-processors/processors/LogAttribute.h
+++ b/extensions/standard-processors/processors/LogAttribute.h
@@ -40,8 +40,9 @@ class LogAttribute : public core::Processor {
/*!
* Create a new processor
*/
- LogAttribute(std::string name, utils::Identifier uuid = utils::Identifier())
+ LogAttribute(std::string name, utils::Identifier uuid = utils::Identifier())
: Processor(name, uuid),
+ flowfiles_to_log_(1),
logger_(logging::LoggerFactory<LogAttribute>::getLogger()) {
}
// Destructor
@@ -55,6 +56,7 @@ class LogAttribute : public core::Processor {
static core::Property AttributesToIgnore;
static core::Property LogPayload;
static core::Property LogPrefix;
+ static core::Property FlowFilesToLog;
// Supported Relationships
static core::Relationship Success;
enum LogAttrLevel {
@@ -111,14 +113,16 @@ class LogAttribute : public core::Processor {
};
public:
+ virtual void onSchedule(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::ProcessSessionFactory> &factory) override;
// OnTrigger method, implemented by NiFi LogAttribute
- virtual void onTrigger(core::ProcessContext *context, core::ProcessSession
*session);
+ virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context,
const std::shared_ptr<core::ProcessSession> &session) override;
// Initialize, over write by NiFi LogAttribute
- virtual void initialize(void);
+ virtual void initialize(void) override;
protected:
private:
+ uint64_t flowfiles_to_log_;
// Logger
std::shared_ptr<logging::Logger> logger_;
};
diff --git a/extensions/standard-processors/tests/unit/ProcessorTests.cpp
b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
index a319178..5d7edd6 100644
--- a/extensions/standard-processors/tests/unit/ProcessorTests.cpp
+++ b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
@@ -251,6 +251,97 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
LogTestController::getInstance().reset();
}
+TEST_CASE("LogAttributeTestInvalid", "[TestLogAttribute]") {
+ TestController testController;
+
LogTestController::getInstance().setTrace<minifi::processors::LogAttribute>();
+ LogTestController::getInstance().setDebug<minifi::processors::GetFile>();
+
+ std::shared_ptr<TestPlan> plan = testController.createPlan();
+ std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile",
"getfileCreate2");
+
+ auto loggattr = plan->addProcessor("LogAttribute", "logattribute",
core::Relationship("success", "description"), true);
+
+ char format[] = "/tmp/gt.XXXXXX";
+ char *dir = testController.createTempDirectory(format);
+
+ plan->setProperty(getfile,
org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir);
+ plan->setProperty(getfile,
org::apache::nifi::minifi::processors::GetFile::BatchSize.getName(), "1");
+ REQUIRE_THROWS_AS( plan->setProperty(loggattr,
org::apache::nifi::minifi::processors::LogAttribute::FlowFilesToLog.getName(),
"-1"),std::out_of_range);
+ LogTestController::getInstance().reset();
+}
+
+void testMultiplesLogAttribute(int fileCount, int flowsToLog, std::string
verifyStringFlowsLogged = ""){
+ TestController testController;
+
LogTestController::getInstance().setTrace<minifi::processors::LogAttribute>();
+ LogTestController::getInstance().setDebug<minifi::processors::GetFile>();
+
+ std::shared_ptr<TestPlan> plan = testController.createPlan();
+ std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile",
"getfileCreate2");
+
+ auto loggattr = plan->addProcessor("LogAttribute", "logattribute",
core::Relationship("success", "description"), true);
+
+ char format[] = "/tmp/gt.XXXXXX";
+ char *dir = testController.createTempDirectory(format);
+
+ auto flowsToLogStr = std::to_string(flowsToLog);
+ if ( verifyStringFlowsLogged.empty() )
+ verifyStringFlowsLogged = std::to_string(flowsToLog);
+ plan->setProperty(getfile,
org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir);
+ plan->setProperty(getfile,
org::apache::nifi::minifi::processors::GetFile::BatchSize.getName(),
std::to_string(fileCount));
+ plan->setProperty(loggattr,
org::apache::nifi::minifi::processors::LogAttribute::FlowFilesToLog.getName(),
flowsToLogStr);
+ testController.runSession(plan, false);
+ auto records = plan->getProvenanceRecords();
+ std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
+ REQUIRE(record == nullptr);
+ REQUIRE(records.size() == 0);
+
+ std::vector<std::string> files;
+
+ for (int i = 0; i < fileCount; i++) {
+ std::fstream file;
+ std::stringstream ss;
+ ss << dir << "/" << "tstFile" << i << ".ext";
+ file.open(ss.str(), std::ios::out);
+ file << "tempFile";
+ file.close();
+
+
+ files.push_back(ss.str());
+ }
+
+ plan->reset();
+ testController.runSession(plan, false);
+
+ for (const auto &created_file : files) {
+ unlink(created_file.c_str());
+ }
+
+ records = plan->getProvenanceRecords();
+ record = plan->getCurrentFlowFile();
+ testController.runSession(plan, false);
+
+ records = plan->getProvenanceRecords();
+ record = plan->getCurrentFlowFile();
+
+ REQUIRE(true == LogTestController::getInstance().contains("Size:8
Offset:0"));
+ REQUIRE(true == LogTestController::getInstance().contains("key:path
value:" + std::string(dir)));
+ REQUIRE(true == LogTestController::getInstance().contains("Logged " +
verifyStringFlowsLogged + " flow files"));
+ LogTestController::getInstance().reset();
+}
+
+TEST_CASE("LogAttributeTestSingle", "[TestLogAttribute]") {
+ testMultiplesLogAttribute(1,3,"1");
+}
+
+
+TEST_CASE("LogAttributeTestMultiple", "[TestLogAttribute]") {
+ testMultiplesLogAttribute(5,3);
+}
+
+TEST_CASE("LogAttributeTestAll", "[TestLogAttribute]") {
+ testMultiplesLogAttribute(5,0,"5");
+}
+
TEST_CASE("Test Find file", "[getfileCreate3]") {
TestController testController;
LogTestController::getInstance().setDebug<minifi::provenance::ProvenanceReporter>();
diff --git a/libminifi/include/core/PropertyValidation.h
b/libminifi/include/core/PropertyValidation.h
index fc2a981..2938300 100644
--- a/libminifi/include/core/PropertyValidation.h
+++ b/libminifi/include/core/PropertyValidation.h
@@ -22,6 +22,7 @@
#include "core/state/Value.h"
#include "TypedValues.h"
#include "utils/StringUtils.h"
+#include <limits>
#include <memory>
namespace org {
@@ -229,6 +230,10 @@ class UnsignedLongValidator : public PropertyValidator {
ValidationResult validate(const std::string &subject, const std::string
&input) const {
try {
+ auto negative = input.find_first_of('-') != std::string::npos;
+ if (negative){
+ throw std::out_of_range("non negative expected");
+ }
std::stoull(input);
return
ValidationResult::Builder::createBuilder().withSubject(subject).withInput(input).isValid(true).build();
} catch (...) {
diff --git a/libminifi/include/core/state/Value.h
b/libminifi/include/core/state/Value.h
index 8f714a5..17bbdae 100644
--- a/libminifi/include/core/state/Value.h
+++ b/libminifi/include/core/state/Value.h
@@ -98,6 +98,10 @@ class Value {
}
virtual bool getValue(uint64_t &ref) {
+ const auto negative = string_value.find_first_of('-') != std::string::npos;
+ if (negative){
+ return false;
+ }
ref = std::stoull(string_value);
return true;
}
@@ -227,6 +231,15 @@ class UInt64Value : public Value {
explicit UInt64Value(const std::string &strvalue)
: Value(strvalue),
value(std::stoull(strvalue)) {
+ /**
+ * This is a fundamental change in that we would be changing where this
error occurs.
+ * We should be prudent about breaking backwards compatibility, but since
Uint64Value
+ * is only created with a validator and type, we **should** be okay.
+ */
+ const auto negative = strvalue.find_first_of('-') != std::string::npos;
+ if (negative){
+ throw std::out_of_range("negative value detected");
+ }
setTypeId<uint64_t>();
}
diff --git a/libminifi/src/core/PropertyValidation.cpp
b/libminifi/src/core/PropertyValidation.cpp
index d92b073..ab1f53a 100644
--- a/libminifi/src/core/PropertyValidation.cpp
+++ b/libminifi/src/core/PropertyValidation.cpp
@@ -28,6 +28,8 @@ StandardValidators::StandardValidators() {
INVALID = std::make_shared<AlwaysValid>(false, "INVALID");
INTEGER_VALIDATOR = std::make_shared<IntegerValidator>("INTEGER_VALIDATOR");
LONG_VALIDATOR = std::make_shared<LongValidator>("LONG_VALIDATOR");
+ // name is used by java nifi validators, so we should keep this LONG and not
change to reflect
+ // its internal use
UNSIGNED_LONG_VALIDATOR =
std::make_shared<UnsignedLongValidator>("LONG_VALIDATOR");
DATA_SIZE_VALIDATOR =
std::make_shared<DataSizeValidator>("DATA_SIZE_VALIDATOR");
TIME_PERIOD_VALIDATOR =
std::make_shared<TimePeriodValidator>("TIME_PERIOD_VALIDATOR");