This is an automated email from the ASF dual-hosted git repository.

phrocker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new bcf8921  MINIFICPP-287: Add property for flow files to log
bcf8921 is described below

commit bcf8921b839c967a31c764ff29027b320c462634
Author: Marc Parisi <[email protected]>
AuthorDate: Tue May 14 15:26:05 2019 -0400

    MINIFICPP-287: Add property for flow files to log
    
    This closes #553.
    
    Approved on GH By jdye64.
    
    Signed-off-by: Marc Parisi <[email protected]>
---
 .../processors/LogAttribute.cpp                    | 165 +++++++++++----------
 .../standard-processors/processors/LogAttribute.h  |  10 +-
 .../tests/unit/ProcessorTests.cpp                  |  91 ++++++++++++
 libminifi/include/core/PropertyValidation.h        |   5 +
 libminifi/include/core/state/Value.h               |  13 ++
 libminifi/src/core/PropertyValidation.cpp          |   2 +
 6 files changed, 208 insertions(+), 78 deletions(-)

diff --git a/extensions/standard-processors/processors/LogAttribute.cpp 
b/extensions/standard-processors/processors/LogAttribute.cpp
index 678d12c..a02bafe 100644
--- a/extensions/standard-processors/processors/LogAttribute.cpp
+++ b/extensions/standard-processors/processors/LogAttribute.cpp
@@ -39,12 +39,17 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-core::Property 
LogAttribute::LogLevel(core::PropertyBuilder::createProperty("Log 
Level")->withDescription("The Log Level to use when logging the 
Attributes")->withAllowableValues<std::string>({
+core::Property 
LogAttribute::LogLevel(core::PropertyBuilder::createProperty("Log 
Level")->withDescription("The Log Level to use when logging the 
Attributes")->withAllowableValues<std::string>( {
     "info", "trace", "error", "warn", "debug" })->build());
 
 core::Property LogAttribute::AttributesToLog(
     core::PropertyBuilder::createProperty("Attributes to 
Log")->withDescription("A comma-separated list of Attributes to Log. If not 
specified, all attributes will be logged.")->build());
 
+core::Property LogAttribute::FlowFilesToLog(
+    core::PropertyBuilder::createProperty("FlowFiles To Log")->withDescription(
+        "Number of flow files to log. If set to zero all flow files will be 
logged. Please note that this may block other threads from running if not used 
judiciously.")->withDefaultValue<uint64_t>(1)
+        ->build());
+
 core::Property LogAttribute::AttributesToIgnore(
     core::PropertyBuilder::createProperty("Attributes to 
Ignore")->withDescription("A comma-separated list of Attributes to ignore. If 
not specified, no attributes will be ignored.")->build());
 
@@ -63,6 +68,7 @@ void LogAttribute::initialize() {
   properties.insert(AttributesToLog);
   properties.insert(AttributesToIgnore);
   properties.insert(LogPayload);
+  properties.insert(FlowFilesToLog);
   properties.insert(LogPrefix);
   setSupportedProperties(properties);
   // Set the supported relationships
@@ -71,91 +77,100 @@ void LogAttribute::initialize() {
   setSupportedRelationships(relationships);
 }
 
-void LogAttribute::onTrigger(core::ProcessContext *context, 
core::ProcessSession *session) {
-  logger_->log_trace("enter log attribute");
+void LogAttribute::onSchedule(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+  core::Property flowsToLog = FlowFilesToLog;
+
+  if (getProperty(FlowFilesToLog.getName(), flowsToLog)) {
+    // we are going this route since to avoid breaking backwards compatibility 
the get property function doesn't perform validation ( That's done
+    // in configuration. In future releases we can add that exception handling 
there.
+    if (!flowsToLog.getValue().validate("Validating FlowFilesToLog").valid())
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid value for flowfiles 
to log: " + flowsToLog.getValue().to_string());
+    flowfiles_to_log_ = flowsToLog.getValue();
+  }
+}
+// OnTrigger method, implemented by NiFi LogAttribute
+void LogAttribute::onTrigger(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSession> &session) {
+  logger_->log_trace("enter log attribute, attempting to retrieve %u flow 
files", flowfiles_to_log_);
   std::string dashLine = "--------------------------------------------------";
   LogAttrLevel level = LogAttrLevelInfo;
   bool logPayload = false;
   std::ostringstream message;
 
-  std::shared_ptr<core::FlowFile> flow = session->get();
+  uint64_t i = 0;
+  const auto max = flowfiles_to_log_ == 0 ? UINT64_MAX : flowfiles_to_log_;
+  for (; i < max; ++i) {
+    std::shared_ptr<core::FlowFile> flow = session->get();
 
-  if (!flow) {
-    return;
-  }
+    if (!flow) {
+      break;
+    }
 
-  std::string value;
-  if (context->getProperty(LogLevel.getName(), value)) {
-    logLevelStringToEnum(value, level);
-  }
-  if (context->getProperty(LogPrefix.getName(), value)) {
-    dashLine = "-----" + value + "-----";
-  }
+    std::string value;
+    if (context->getProperty(LogLevel.getName(), value)) {
+      logLevelStringToEnum(value, level);
+    }
+    if (context->getProperty(LogPrefix.getName(), value)) {
+      dashLine = "-----" + value + "-----";
+    }
 
-  context->getProperty(LogPayload.getName(), logPayload);
-
-  message << "Logging for flow file " << "\n";
-  message << dashLine;
-  message << "\nStandard FlowFile Attributes";
-  message << "\n" << "UUID:" << flow->getUUIDStr();
-  message << "\n" << "EntryDate:" << getTimeStr(flow->getEntryDate());
-  message << "\n" << "lineageStartDate:" << 
getTimeStr(flow->getlineageStartDate());
-  message << "\n" << "Size:" << flow->getSize() << " Offset:" << 
flow->getOffset();
-  message << "\nFlowFile Attributes Map Content";
-  std::map<std::string, std::string> attrs = flow->getAttributes();
-  std::map<std::string, std::string>::iterator it;
-  for (it = attrs.begin(); it != attrs.end(); it++) {
-    message << "\n" << "key:" << it->first << " value:" << it->second;
-  }
-  message << "\nFlowFile Resource Claim Content";
-  std::shared_ptr<ResourceClaim> claim = flow->getResourceClaim();
-  if (claim) {
-    message << "\n" << "Content Claim:" << claim->getContentFullPath();
-  }
-  if (logPayload && flow->getSize() <= 1024 * 1024) {
-    message << "\n" << "Payload:" << "\n";
-    ReadCallback callback(flow->getSize());
-    session->read(flow, &callback);
-    for (unsigned int i = 0, j = 0; i < callback.read_size_; i++) {
-      message << std::hex << callback.buffer_[i];
-      j++;
-      if (j == 80) {
-        message << '\n';
-        j = 0;
+    context->getProperty(LogPayload.getName(), logPayload);
+
+    message << "Logging for flow file " << "\n";
+    message << dashLine;
+    message << "\nStandard FlowFile Attributes";
+    message << "\n" << "UUID:" << flow->getUUIDStr();
+    message << "\n" << "EntryDate:" << getTimeStr(flow->getEntryDate());
+    message << "\n" << "lineageStartDate:" << 
getTimeStr(flow->getlineageStartDate());
+    message << "\n" << "Size:" << flow->getSize() << " Offset:" << 
flow->getOffset();
+    message << "\nFlowFile Attributes Map Content";
+    std::map<std::string, std::string> attrs = flow->getAttributes();
+    std::map<std::string, std::string>::iterator it;
+    for (it = attrs.begin(); it != attrs.end(); it++) {
+      message << "\n" << "key:" << it->first << " value:" << it->second;
+    }
+    message << "\nFlowFile Resource Claim Content";
+    std::shared_ptr<ResourceClaim> claim = flow->getResourceClaim();
+    if (claim) {
+      message << "\n" << "Content Claim:" << claim->getContentFullPath();
+    }
+    if (logPayload && flow->getSize() <= 1024 * 1024) {
+      message << "\n" << "Payload:" << "\n";
+      ReadCallback callback(flow->getSize());
+      session->read(flow, &callback);
+      for (unsigned int i = 0, j = 0; i < callback.read_size_; i++) {
+        message << std::hex << callback.buffer_[i];
+        j++;
+        if (j == 80) {
+          message << '\n';
+          j = 0;
+        }
       }
     }
+    message << "\n" << dashLine << std::ends;
+    std::string output = message.str();
+
+    switch (level) {
+      case LogAttrLevelInfo:
+        logger_->log_info("%s", output);
+        break;
+      case LogAttrLevelDebug:
+        logger_->log_debug("%s", output);
+        break;
+      case LogAttrLevelError:
+        logger_->log_error("%s", output);
+        break;
+      case LogAttrLevelTrace:
+        logger_->log_trace("%s", output);
+        break;
+      case LogAttrLevelWarn:
+        logger_->log_warn("%s", output);
+        break;
+      default:
+        break;
+    }
+    session->transfer(flow, Success);
   }
-  message << "\n" << dashLine << std::ends;
-  std::string output = message.str();
-
-  switch (level) {
-    case LogAttrLevelInfo:
-      logger_->log_info("%s", output);
-      break;
-    case LogAttrLevelDebug:
-      logger_->log_debug("%s", output);
-      break;
-    case LogAttrLevelError:
-      logger_->log_error("%s", output);
-      break;
-    case LogAttrLevelTrace:
-      logger_->log_trace("%s", output);
-      break;
-    case LogAttrLevelWarn:
-      logger_->log_warn("%s", output);
-      break;
-    default:
-      break;
-  }
-
-  // Test Import
-  /*
-   std::shared_ptr<FlowFileRecord> importRecord = session->create();
-   session->import(claim->getContentFullPath(), importRecord);
-   session->transfer(importRecord, Success); */
-
-  // Transfer to the relationship
-  session->transfer(flow, Success);
+  logger_->log_debug("Logged %d flow files", i);
 }
 
 } /* namespace processors */
diff --git a/extensions/standard-processors/processors/LogAttribute.h 
b/extensions/standard-processors/processors/LogAttribute.h
index 1c5b2e4..5d03e11 100644
--- a/extensions/standard-processors/processors/LogAttribute.h
+++ b/extensions/standard-processors/processors/LogAttribute.h
@@ -40,8 +40,9 @@ class LogAttribute : public core::Processor {
   /*!
    * Create a new processor
    */
-  LogAttribute(std::string name,  utils::Identifier uuid = utils::Identifier())
+  LogAttribute(std::string name, utils::Identifier uuid = utils::Identifier())
       : Processor(name, uuid),
+        flowfiles_to_log_(1),
         logger_(logging::LoggerFactory<LogAttribute>::getLogger()) {
   }
   // Destructor
@@ -55,6 +56,7 @@ class LogAttribute : public core::Processor {
   static core::Property AttributesToIgnore;
   static core::Property LogPayload;
   static core::Property LogPrefix;
+  static core::Property FlowFilesToLog;
   // Supported Relationships
   static core::Relationship Success;
   enum LogAttrLevel {
@@ -111,14 +113,16 @@ class LogAttribute : public core::Processor {
   };
 
  public:
+  virtual void onSchedule(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSessionFactory> &factory) override;
   // OnTrigger method, implemented by NiFi LogAttribute
-  virtual void onTrigger(core::ProcessContext *context, core::ProcessSession 
*session);
+  virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, 
const std::shared_ptr<core::ProcessSession> &session) override;
   // Initialize, over write by NiFi LogAttribute
-  virtual void initialize(void);
+  virtual void initialize(void) override;
 
  protected:
 
  private:
+  uint64_t flowfiles_to_log_;
   // Logger
   std::shared_ptr<logging::Logger> logger_;
 };
diff --git a/extensions/standard-processors/tests/unit/ProcessorTests.cpp 
b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
index a319178..5d7edd6 100644
--- a/extensions/standard-processors/tests/unit/ProcessorTests.cpp
+++ b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
@@ -251,6 +251,97 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
   LogTestController::getInstance().reset();
 }
 
+TEST_CASE("LogAttributeTestInvalid", "[TestLogAttribute]") {
+  TestController testController;
+  
LogTestController::getInstance().setTrace<minifi::processors::LogAttribute>();
+  LogTestController::getInstance().setDebug<minifi::processors::GetFile>();
+
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+  std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", 
"getfileCreate2");
+
+  auto loggattr = plan->addProcessor("LogAttribute", "logattribute", 
core::Relationship("success", "description"), true);
+
+  char format[] = "/tmp/gt.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+
+  plan->setProperty(getfile, 
org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir);
+  plan->setProperty(getfile, 
org::apache::nifi::minifi::processors::GetFile::BatchSize.getName(), "1");
+  REQUIRE_THROWS_AS( plan->setProperty(loggattr, 
org::apache::nifi::minifi::processors::LogAttribute::FlowFilesToLog.getName(), 
"-1"),std::out_of_range);
+  LogTestController::getInstance().reset();
+}
+
+void testMultiplesLogAttribute(int fileCount, int flowsToLog, std::string 
verifyStringFlowsLogged = ""){
+  TestController testController;
+    
LogTestController::getInstance().setTrace<minifi::processors::LogAttribute>();
+    LogTestController::getInstance().setDebug<minifi::processors::GetFile>();
+
+    std::shared_ptr<TestPlan> plan = testController.createPlan();
+    std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", 
"getfileCreate2");
+
+    auto loggattr = plan->addProcessor("LogAttribute", "logattribute", 
core::Relationship("success", "description"), true);
+
+    char format[] = "/tmp/gt.XXXXXX";
+    char *dir = testController.createTempDirectory(format);
+
+    auto flowsToLogStr = std::to_string(flowsToLog);
+    if ( verifyStringFlowsLogged.empty() )
+      verifyStringFlowsLogged = std::to_string(flowsToLog);
+    plan->setProperty(getfile, 
org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir);
+    plan->setProperty(getfile, 
org::apache::nifi::minifi::processors::GetFile::BatchSize.getName(), 
std::to_string(fileCount));
+    plan->setProperty(loggattr, 
org::apache::nifi::minifi::processors::LogAttribute::FlowFilesToLog.getName(), 
flowsToLogStr);
+    testController.runSession(plan, false);
+    auto records = plan->getProvenanceRecords();
+    std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
+    REQUIRE(record == nullptr);
+    REQUIRE(records.size() == 0);
+
+    std::vector<std::string> files;
+
+    for (int i = 0; i < fileCount; i++) {
+      std::fstream file;
+      std::stringstream ss;
+      ss << dir << "/" << "tstFile" << i << ".ext";
+      file.open(ss.str(), std::ios::out);
+      file << "tempFile";
+      file.close();
+
+
+      files.push_back(ss.str());
+    }
+
+    plan->reset();
+    testController.runSession(plan, false);
+
+    for (const auto &created_file : files) {
+      unlink(created_file.c_str());
+    }
+
+    records = plan->getProvenanceRecords();
+    record = plan->getCurrentFlowFile();
+    testController.runSession(plan, false);
+
+    records = plan->getProvenanceRecords();
+    record = plan->getCurrentFlowFile();
+
+    REQUIRE(true == LogTestController::getInstance().contains("Size:8 
Offset:0"));
+    REQUIRE(true == LogTestController::getInstance().contains("key:path 
value:" + std::string(dir)));
+    REQUIRE(true == LogTestController::getInstance().contains("Logged " +  
verifyStringFlowsLogged + " flow files"));
+    LogTestController::getInstance().reset();
+}
+
+TEST_CASE("LogAttributeTestSingle", "[TestLogAttribute]") {
+  testMultiplesLogAttribute(1,3,"1");
+}
+
+
+TEST_CASE("LogAttributeTestMultiple", "[TestLogAttribute]") {
+  testMultiplesLogAttribute(5,3);
+}
+
+TEST_CASE("LogAttributeTestAll", "[TestLogAttribute]") {
+  testMultiplesLogAttribute(5,0,"5");
+}
+
 TEST_CASE("Test Find file", "[getfileCreate3]") {
   TestController testController;
   
LogTestController::getInstance().setDebug<minifi::provenance::ProvenanceReporter>();
diff --git a/libminifi/include/core/PropertyValidation.h 
b/libminifi/include/core/PropertyValidation.h
index fc2a981..2938300 100644
--- a/libminifi/include/core/PropertyValidation.h
+++ b/libminifi/include/core/PropertyValidation.h
@@ -22,6 +22,7 @@
 #include "core/state/Value.h"
 #include "TypedValues.h"
 #include "utils/StringUtils.h"
+#include <limits>
 #include <memory>
 
 namespace org {
@@ -229,6 +230,10 @@ class UnsignedLongValidator : public PropertyValidator {
 
   ValidationResult validate(const std::string &subject, const std::string 
&input) const {
     try {
+      auto negative = input.find_first_of('-') != std::string::npos;
+      if (negative){
+        throw std::out_of_range("non negative expected");
+      }
       std::stoull(input);
       return 
ValidationResult::Builder::createBuilder().withSubject(subject).withInput(input).isValid(true).build();
     } catch (...) {
diff --git a/libminifi/include/core/state/Value.h 
b/libminifi/include/core/state/Value.h
index 8f714a5..17bbdae 100644
--- a/libminifi/include/core/state/Value.h
+++ b/libminifi/include/core/state/Value.h
@@ -98,6 +98,10 @@ class Value {
   }
 
   virtual bool getValue(uint64_t &ref) {
+    const auto negative = string_value.find_first_of('-') != std::string::npos;
+     if (negative){
+       return false;
+     }
     ref = std::stoull(string_value);
     return true;
   }
@@ -227,6 +231,15 @@ class UInt64Value : public Value {
   explicit UInt64Value(const std::string &strvalue)
       : Value(strvalue),
         value(std::stoull(strvalue)) {
+    /**
+     * This is a fundamental change in that we would be changing where this 
error occurs.
+     * We should be prudent about breaking backwards compatibility, but since 
Uint64Value
+     * is only created with a validator and type, we **should** be okay.
+     */
+    const auto negative = strvalue.find_first_of('-') != std::string::npos;
+     if (negative){
+       throw std::out_of_range("negative value detected");
+     }
     setTypeId<uint64_t>();
   }
 
diff --git a/libminifi/src/core/PropertyValidation.cpp 
b/libminifi/src/core/PropertyValidation.cpp
index d92b073..ab1f53a 100644
--- a/libminifi/src/core/PropertyValidation.cpp
+++ b/libminifi/src/core/PropertyValidation.cpp
@@ -28,6 +28,8 @@ StandardValidators::StandardValidators() {
   INVALID = std::make_shared<AlwaysValid>(false, "INVALID");
   INTEGER_VALIDATOR = std::make_shared<IntegerValidator>("INTEGER_VALIDATOR");
   LONG_VALIDATOR = std::make_shared<LongValidator>("LONG_VALIDATOR");
+  // name is used by java nifi validators, so we should keep this LONG and not 
change to reflect
+  // its internal use
   UNSIGNED_LONG_VALIDATOR = 
std::make_shared<UnsignedLongValidator>("LONG_VALIDATOR");
   DATA_SIZE_VALIDATOR = 
std::make_shared<DataSizeValidator>("DATA_SIZE_VALIDATOR");
   TIME_PERIOD_VALIDATOR = 
std::make_shared<TimePeriodValidator>("TIME_PERIOD_VALIDATOR");

Reply via email to