Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master df2bb2549 -> e24d54e03


ExtractText processor doesn't handle "Size limit" property

This closes #446.

Signed-off-by: Marc Parisi <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/e24d54e0
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/e24d54e0
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/e24d54e0

Branch: refs/heads/master
Commit: e24d54e03df86b53bd18aa6efb9ce2164ac9c003
Parents: df2bb25
Author: Arpad Boda <[email protected]>
Authored: Tue Nov 20 13:56:48 2018 +0100
Committer: Marc Parisi <[email protected]>
Committed: Tue Nov 27 08:54:08 2018 -0500

----------------------------------------------------------------------
 libminifi/include/processors/ExtractText.h |  5 ++--
 libminifi/src/processors/ExtractText.cpp   | 37 +++++++++++--------------
 libminifi/test/unit/ExtractTextTests.cpp   | 28 +++++++++++++++++--
 3 files changed, 43 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e24d54e0/libminifi/include/processors/ExtractText.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/ExtractText.h 
b/libminifi/include/processors/ExtractText.h
index b01665e..fcaf09d 100644
--- a/libminifi/include/processors/ExtractText.h
+++ b/libminifi/include/processors/ExtractText.h
@@ -56,9 +56,9 @@ public:
     static constexpr int DEFAULT_SIZE_LIMIT = 2 * 1024 * 1024;
 
     //! OnTrigger method, implemented by NiFi ExtractText
-    void onTrigger(core::ProcessContext *context, core::ProcessSession 
*session);
+    void onTrigger(core::ProcessContext *context, core::ProcessSession 
*session) override;
     //! Initialize, over write by NiFi ExtractText
-    void initialize(void);
+    void initialize(void) override;
 
     class ReadCallback : public InputStreamCallback {
     public:
@@ -70,7 +70,6 @@ public:
         std::shared_ptr<core::FlowFile> flowFile_;
         core::ProcessContext *ctx_;
         std::vector<uint8_t> buffer_;
-        int64_t max_read_;
     };
 
 protected:

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e24d54e0/libminifi/src/processors/ExtractText.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/ExtractText.cpp 
b/libminifi/src/processors/ExtractText.cpp
index 328a476..264838e 100644
--- a/libminifi/src/processors/ExtractText.cpp
+++ b/libminifi/src/processors/ExtractText.cpp
@@ -36,6 +36,8 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
+#define MAX_BUFFER_SIZE 4096
+
 core::Property ExtractText::Attribute("Attribute", "Attribute to set from 
content", "");
 core::Property ExtractText::SizeLimit("Size Limit", "Maximum number of bytes 
to read into the attribute. 0 for no limit. Default is 2MB.");
 core::Relationship ExtractText::Success("success", "success operational on the 
flow record");
@@ -44,6 +46,7 @@ void ExtractText::initialize() {
   //! Set the supported properties
   std::set<core::Property> properties;
   properties.insert(Attribute);
+  properties.insert(SizeLimit);
   setSupportedProperties(properties);
   //! Set the supported relationships
   std::set<core::Relationship> relationships;
@@ -65,9 +68,8 @@ void ExtractText::onTrigger(core::ProcessContext *context, 
core::ProcessSession
 
 int64_t ExtractText::ReadCallback::process(std::shared_ptr<io::BaseStream> 
stream) {
   int64_t ret = 0;
-  uint64_t size_limit = flowFile_->getSize();
   uint64_t read_size = 0;
-  uint64_t loop_read = max_read_;
+  uint64_t size_limit = flowFile_->getSize();
 
   std::string attrKey, sizeLimitStr;
   ctx_->getProperty(Attribute.getName(), attrKey);
@@ -79,39 +81,32 @@ int64_t 
ExtractText::ReadCallback::process(std::shared_ptr<io::BaseStream> strea
     size_limit = std::stoi(sizeLimitStr);
 
   std::ostringstream contentStream;
-  std::string contentStr;
 
   while (read_size < size_limit) {
-    if (size_limit - read_size < (uint64_t) max_read_)
-      loop_read = size_limit - read_size;
-
-    ret = stream->readData(buffer_, loop_read);
-    buffer_.resize(ret);
+    // Don't read more than config limit or the size of the buffer
+    ret = stream->readData(buffer_, std::min<uint64_t>((size_limit - 
read_size), buffer_.capacity()));
 
     if (ret < 0) {
-      return -1;
+      return -1;  // Stream error
+    } else if (ret == 0) {
+      break;  // End of stream, no more data
     }
 
-    if (ret > 0) {
-      contentStream.write(reinterpret_cast<const char*>(&buffer_[0]), ret);
-      if (contentStream.fail()) {
-        return -1;
-      }
-    } else {
-      break;
+    contentStream.write(reinterpret_cast<const char*>(&buffer_[0]), ret);
+    read_size += ret;
+    if (contentStream.fail()) {
+      return -1;
     }
   }
 
-  contentStr = contentStream.str();
-  flowFile_->setAttribute(attrKey, contentStr);
+  flowFile_->setAttribute(attrKey, contentStream.str());
   return read_size;
 }
 
 ExtractText::ReadCallback::ReadCallback(std::shared_ptr<core::FlowFile> 
flowFile, core::ProcessContext *ctx)
-    : max_read_(4096),
-      flowFile_(flowFile),
+    : flowFile_(flowFile),
       ctx_(ctx) {
-  buffer_.resize(max_read_);
+  buffer_.reserve(std::min<uint64_t>(flowFile->getSize(), MAX_BUFFER_SIZE));
 }
 
 } /* namespace processors */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e24d54e0/libminifi/test/unit/ExtractTextTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ExtractTextTests.cpp 
b/libminifi/test/unit/ExtractTextTests.cpp
index 5931210..939248f 100644
--- a/libminifi/test/unit/ExtractTextTests.cpp
+++ b/libminifi/test/unit/ExtractTextTests.cpp
@@ -34,7 +34,6 @@
 #include "core/ProcessorNode.h"
 
 #include "processors/GetFile.h"
-#include "processors/PutFile.h"
 #include "processors/ExtractText.h"
 #include "processors/LogAttribute.h"
 
@@ -42,7 +41,7 @@ const char* TEST_TEXT = "Test text\n";
 const char* TEST_FILE = "test_file.txt";
 const char* TEST_ATTR = "ExtractedText";
 
-TEST_CASE("Test Creation of ExtractText", "[extracttextCreate]") {
+TEST_CASE("Test creation of ExtractText", "[extracttextCreate]") {
     TestController testController;
     std::shared_ptr<core::Processor> processor = 
std::make_shared<org::apache::nifi::minifi::processors::ExtractText>("processorname");
     REQUIRE(processor->getName() == "processorname");
@@ -53,7 +52,6 @@ TEST_CASE("Test Creation of ExtractText", 
"[extracttextCreate]") {
 TEST_CASE("Test usage of ExtractText", "[extracttextTest]") {
     TestController testController;
     
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::ExtractText>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::PutFile>();
     
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::GetFile>();
     
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
     LogTestController::getInstance().setTrace<core::ProcessSession>();
@@ -98,5 +96,29 @@ TEST_CASE("Test usage of ExtractText", "[extracttextTest]") {
 
     REQUIRE(LogTestController::getInstance().contains(log_check));
 
+    plan->reset();
+
+    plan->setProperty(maprocessor, 
org::apache::nifi::minifi::processors::ExtractText::SizeLimit.getName(), "4");
+
+    LogTestController::getInstance().reset();
+    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
+
+    std::ofstream test_file_2(test_file_path + "2");
+    if (test_file_2.is_open()) {
+        test_file_2 << TEST_TEXT << std::endl;
+        test_file_2.close();
+    }
+
+    plan->runNextProcessor();  // GetFile
+    plan->runNextProcessor();  // ExtractText
+    plan->runNextProcessor();  // LogAttribute
+
+    REQUIRE(LogTestController::getInstance().contains(log_check) == false);
+
+    ss2.str("");
+    ss2 << "key:" << TEST_ATTR << " value:" << "Test";
+    log_check = ss2.str();
+    REQUIRE(LogTestController::getInstance().contains(log_check));
+
     LogTestController::getInstance().reset();
 }

Reply via email to