Repository: nifi-minifi-cpp Updated Branches: refs/heads/master df2bb2549 -> e24d54e03
ExtractText processor doesn't handle "Size limit" property This closes #446. Signed-off-by: Marc Parisi <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/e24d54e0 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/e24d54e0 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/e24d54e0 Branch: refs/heads/master Commit: e24d54e03df86b53bd18aa6efb9ce2164ac9c003 Parents: df2bb25 Author: Arpad Boda <[email protected]> Authored: Tue Nov 20 13:56:48 2018 +0100 Committer: Marc Parisi <[email protected]> Committed: Tue Nov 27 08:54:08 2018 -0500 ---------------------------------------------------------------------- libminifi/include/processors/ExtractText.h | 5 ++-- libminifi/src/processors/ExtractText.cpp | 37 +++++++++++-------------- libminifi/test/unit/ExtractTextTests.cpp | 28 +++++++++++++++++-- 3 files changed, 43 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e24d54e0/libminifi/include/processors/ExtractText.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/ExtractText.h b/libminifi/include/processors/ExtractText.h index b01665e..fcaf09d 100644 --- a/libminifi/include/processors/ExtractText.h +++ b/libminifi/include/processors/ExtractText.h @@ -56,9 +56,9 @@ public: static constexpr int DEFAULT_SIZE_LIMIT = 2 * 1024 * 1024; //! OnTrigger method, implemented by NiFi ExtractText - void onTrigger(core::ProcessContext *context, core::ProcessSession *session); + void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override; //! Initialize, over write by NiFi ExtractText - void initialize(void); + void initialize(void) override; class ReadCallback : public InputStreamCallback { public: @@ -70,7 +70,6 @@ public: std::shared_ptr<core::FlowFile> flowFile_; core::ProcessContext *ctx_; std::vector<uint8_t> buffer_; - int64_t max_read_; }; protected: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e24d54e0/libminifi/src/processors/ExtractText.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/ExtractText.cpp b/libminifi/src/processors/ExtractText.cpp index 328a476..264838e 100644 --- a/libminifi/src/processors/ExtractText.cpp +++ b/libminifi/src/processors/ExtractText.cpp @@ -36,6 +36,8 @@ namespace nifi { namespace minifi { namespace processors { +#define MAX_BUFFER_SIZE 4096 + core::Property ExtractText::Attribute("Attribute", "Attribute to set from content", ""); core::Property ExtractText::SizeLimit("Size Limit", "Maximum number of bytes to read into the attribute. 0 for no limit. Default is 2MB."); core::Relationship ExtractText::Success("success", "success operational on the flow record"); @@ -44,6 +46,7 @@ void ExtractText::initialize() { //! Set the supported properties std::set<core::Property> properties; properties.insert(Attribute); + properties.insert(SizeLimit); setSupportedProperties(properties); //! Set the supported relationships std::set<core::Relationship> relationships; @@ -65,9 +68,8 @@ void ExtractText::onTrigger(core::ProcessContext *context, core::ProcessSession int64_t ExtractText::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) { int64_t ret = 0; - uint64_t size_limit = flowFile_->getSize(); uint64_t read_size = 0; - uint64_t loop_read = max_read_; + uint64_t size_limit = flowFile_->getSize(); std::string attrKey, sizeLimitStr; ctx_->getProperty(Attribute.getName(), attrKey); @@ -79,39 +81,32 @@ int64_t ExtractText::ReadCallback::process(std::shared_ptr<io::BaseStream> strea size_limit = std::stoi(sizeLimitStr); std::ostringstream contentStream; - std::string contentStr; while (read_size < size_limit) { - if (size_limit - read_size < (uint64_t) max_read_) - loop_read = size_limit - read_size; - - ret = stream->readData(buffer_, loop_read); - buffer_.resize(ret); + // Don't read more than config limit or the size of the buffer + ret = stream->readData(buffer_, std::min<uint64_t>((size_limit - read_size), buffer_.capacity())); if (ret < 0) { - return -1; + return -1; // Stream error + } else if (ret == 0) { + break; // End of stream, no more data } - if (ret > 0) { - contentStream.write(reinterpret_cast<const char*>(&buffer_[0]), ret); - if (contentStream.fail()) { - return -1; - } - } else { - break; + contentStream.write(reinterpret_cast<const char*>(&buffer_[0]), ret); + read_size += ret; + if (contentStream.fail()) { + return -1; } } - contentStr = contentStream.str(); - flowFile_->setAttribute(attrKey, contentStr); + flowFile_->setAttribute(attrKey, contentStream.str()); return read_size; } ExtractText::ReadCallback::ReadCallback(std::shared_ptr<core::FlowFile> flowFile, core::ProcessContext *ctx) - : max_read_(4096), - flowFile_(flowFile), + : flowFile_(flowFile), ctx_(ctx) { - buffer_.resize(max_read_); + buffer_.reserve(std::min<uint64_t>(flowFile->getSize(), MAX_BUFFER_SIZE)); } } /* namespace processors */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e24d54e0/libminifi/test/unit/ExtractTextTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ExtractTextTests.cpp b/libminifi/test/unit/ExtractTextTests.cpp index 5931210..939248f 100644 --- a/libminifi/test/unit/ExtractTextTests.cpp +++ b/libminifi/test/unit/ExtractTextTests.cpp @@ -34,7 +34,6 @@ #include "core/ProcessorNode.h" #include "processors/GetFile.h" -#include "processors/PutFile.h" #include "processors/ExtractText.h" #include "processors/LogAttribute.h" @@ -42,7 +41,7 @@ const char* TEST_TEXT = "Test text\n"; const char* TEST_FILE = "test_file.txt"; const char* TEST_ATTR = "ExtractedText"; -TEST_CASE("Test Creation of ExtractText", "[extracttextCreate]") { +TEST_CASE("Test creation of ExtractText", "[extracttextCreate]") { TestController testController; std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::ExtractText>("processorname"); REQUIRE(processor->getName() == "processorname"); @@ -53,7 +52,6 @@ TEST_CASE("Test Creation of ExtractText", "[extracttextCreate]") { TEST_CASE("Test usage of ExtractText", "[extracttextTest]") { TestController testController; LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::ExtractText>(); - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::PutFile>(); LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::GetFile>(); LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>(); LogTestController::getInstance().setTrace<core::ProcessSession>(); @@ -98,5 +96,29 @@ TEST_CASE("Test usage of ExtractText", "[extracttextTest]") { REQUIRE(LogTestController::getInstance().contains(log_check)); + plan->reset(); + + plan->setProperty(maprocessor, org::apache::nifi::minifi::processors::ExtractText::SizeLimit.getName(), "4"); + + LogTestController::getInstance().reset(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>(); + + std::ofstream test_file_2(test_file_path + "2"); + if (test_file_2.is_open()) { + test_file_2 << TEST_TEXT << std::endl; + test_file_2.close(); + } + + plan->runNextProcessor(); // GetFile + plan->runNextProcessor(); // ExtractText + plan->runNextProcessor(); // LogAttribute + + REQUIRE(LogTestController::getInstance().contains(log_check) == false); + + ss2.str(""); + ss2 << "key:" << TEST_ATTR << " value:" << "Test"; + log_check = ss2.str(); + REQUIRE(LogTestController::getInstance().contains(log_check)); + LogTestController::getInstance().reset(); }
