This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 0bcfc18b955741fd0bc33806e969031bad3952d4
Author: Martin Zink <[email protected]>
AuthorDate: Mon Feb 7 15:17:27 2022 +0100

    MINIFICPP-1702 DefragmentText multiinput improvement
    
    DefragmentText holds N buffers now, instead of just one.
    So it can be used with TailFile watching multiple files.
    
    Closes #1248
    Signed-off-by: Marton Szasz <[email protected]>
---
 PROCESSORS.md                                      |   2 +-
 .../features/defragtextflowfiles.feature           |  52 ++++-
 .../processors/DefragmentText.cpp                  |  99 +++++----
 .../processors/DefragmentText.h                    |  31 ++-
 .../tests/unit/DefragmentTextTests.cpp             | 222 +++++++++++++++++++--
 libminifi/test/ReadFromFlowFileTestProcessor.cpp   |   3 +-
 libminifi/test/ReadFromFlowFileTestProcessor.h     |  13 ++
 libminifi/test/WriteToFlowFileTestProcessor.h      |   4 +
 8 files changed, 340 insertions(+), 86 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index 5b7882f..966f4f7 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -315,7 +315,7 @@ In the list below, the names of required properties appear 
in bold. Any other pr
 
 ### Description
 
-DefragmentText splits and merges incoming flowfiles so cohesive messages are 
not split between them
+DefragmentText splits and merges incoming flowfiles so cohesive messages are 
not split between them. It can handle multiple inputs differentiated by the 
<em>absolute.path</em> flow file attribute.
 ### Properties
 
 In the list below, the names of required properties appear in bold. Any other 
properties (not in bold) are considered optional. The table also indicates any 
default values, and whether a property supports the NiFi Expression Language.
diff --git a/docker/test/integration/features/defragtextflowfiles.feature 
b/docker/test/integration/features/defragtextflowfiles.feature
index 1d20bfc..04220cc 100644
--- a/docker/test/integration/features/defragtextflowfiles.feature
+++ b/docker/test/integration/features/defragtextflowfiles.feature
@@ -2,7 +2,56 @@ Feature: DefragmentText can defragment fragmented data from 
TailFile
   Background:
     Given the content of "/tmp/output" is monitored
 
-  Scenario Outline: DefragmentText merges split messages from TailFile
+  Scenario Outline: DefragmentText correctly merges split messages from 
TailFile multiple file tail-mode
+    Given a TailFile processor with the name "MultiTail" and the "File to 
Tail" property set to "test_file_.*\.log"
+    And the "tail-base-directory" property of the MultiTail processor is set 
to "/tmp/input"
+    And the "tail-mode" property of the MultiTail processor is set to 
"Multiple file"
+    And the "Initial Start Position" property of the MultiTail processor is 
set to "Beginning of File"
+    And the "Input Delimiter" property of the MultiTail processor is set to "%"
+    And a file with filename "test_file_one.log" and content "<input_one>" is 
present in "/tmp/input"
+    And a file with filename "test_file_two.log" and content "<input_two>" is 
present in "/tmp/input"
+    And a DefragmentText processor with the "Pattern" property set to 
"<pattern>"
+    And the "Pattern Location" property of the DefragmentText processor is set 
to "<pattern location>"
+    And a PutFile processor with the name "SuccessPut" and the "Directory" 
property set to "/tmp/output"
+    And the "success" relationship of the MultiTail processor is connected to 
the DefragmentText
+    And the "success" relationship of the DefragmentText processor is 
connected to the SuccessPut
+
+
+    When all instances start up
+    Then flowfiles with these contents are placed in the monitored directory 
in less than 60 seconds: "<success_flow_files>"
+
+    Examples:
+      | input_one                                    | input_two               
                         | pattern       | pattern location | 
success_flow_files                                                          |
+      | <1>cat%dog%mouse%<2>apple%banana%<3>English% | 
<1>Katze%Hund%Maus%<2>Apfel%Banane%<3>Deutsch%   | <[0-9]+>      | Start of 
Message | 
<1>cat%dog%mouse%,<1>Katze%Hund%Maus%,<2>apple%banana%,<2>Apfel%Banane%       |
+      | <1>cat%dog%mouse%<2>apple%banana%<3>English% | 
<1>Katze%Hund%Maus%<2>Apfel%Banane%<3>Deutsch%   | <[0-9]+>      | End of 
Message   | 
<1>,cat%dog%mouse%<2>,Katze%Hund%Maus%<2>,apple%banana%<3>,Apfel%Banane%<3>   |
+
+  Scenario Outline: DefragmentText correctly merges split messages from 
multiple TailFile
+    Given a TailFile processor with the name "TailOne" and the "File to Tail" 
property set to "/tmp/input/test_file_one.log"
+    And the "Initial Start Position" property of the TailOne processor is set 
to "Beginning of File"
+    And the "Input Delimiter" property of the TailOne processor is set to "%"
+    And a TailFile processor with the name "TailTwo" and the "File to Tail" 
property set to "/tmp/input/test_file_two.log"
+    And the "Initial Start Position" property of the TailTwo processor is set 
to "Beginning of File"
+    And the "Input Delimiter" property of the TailTwo processor is set to "%"
+    And "TailTwo" processor is a start node
+    And a file with filename "test_file_one.log" and content "<input_one>" is 
present in "/tmp/input"
+    And a file with filename "test_file_two.log" and content "<input_two>" is 
present in "/tmp/input"
+    And a DefragmentText processor with the "Pattern" property set to 
"<pattern>"
+    And the "Pattern Location" property of the DefragmentText processor is set 
to "<pattern location>"
+    And a PutFile processor with the name "SuccessPut" and the "Directory" 
property set to "/tmp/output"
+    And the "success" relationship of the TailOne processor is connected to 
the DefragmentText
+    And the "success" relationship of the TailTwo processor is connected to 
the DefragmentText
+    And the "success" relationship of the DefragmentText processor is 
connected to the SuccessPut
+
+
+    When all instances start up
+    Then flowfiles with these contents are placed in the monitored directory 
in less than 60 seconds: "<success_flow_files>"
+
+    Examples:
+      | input_one                                    | input_two               
                         | pattern       | pattern location | 
success_flow_files                                                          |
+      | <1>cat%dog%mouse%<2>apple%banana%<3>English% | 
<1>Katze%Hund%Maus%<2>Apfel%Banane%<3>Deutsch%   | <[0-9]+>      | Start of 
Message | 
<1>cat%dog%mouse%,<1>Katze%Hund%Maus%,<2>apple%banana%,<2>Apfel%Banane%       |
+      | <1>cat%dog%mouse%<2>apple%banana%<3>English% | 
<1>Katze%Hund%Maus%<2>Apfel%Banane%<3>Deutsch%   | <[0-9]+>      | End of 
Message   | 
<1>,cat%dog%mouse%<2>,Katze%Hund%Maus%<2>,apple%banana%<3>,Apfel%Banane%<3>   |
+
+  Scenario Outline: DefragmentText merges split messages from a single TailFile
     Given a TailFile processor with the "File to Tail" property set to 
"/tmp/input/test_file.log"
     And the "Initial Start Position" property of the TailFile processor is set 
to "Beginning of File"
     And the "Input Delimiter" property of the TailFile processor is set to "%"
@@ -17,7 +66,6 @@ Feature: DefragmentText can defragment fragmented data from 
TailFile
     When all instances start up
     Then flowfiles with these contents are placed in the monitored directory 
in less than 30 seconds: "<success_flow_files>"
 
-
     Examples:
       | input                                                        | pattern 
      | pattern location |  success_flow_files                                  
 |
       | <1> apple%banana%<2> foo%bar%baz%<3> cat%dog%                | 
<[0-9]+>      | Start of Message | <1> apple%banana%,<2> foo%bar%baz%           
         |
diff --git a/extensions/standard-processors/processors/DefragmentText.cpp 
b/extensions/standard-processors/processors/DefragmentText.cpp
index 53a15a4..c093e04 100644
--- a/extensions/standard-processors/processors/DefragmentText.cpp
+++ b/extensions/standard-processors/processors/DefragmentText.cpp
@@ -64,14 +64,14 @@ void DefragmentText::onSchedule(core::ProcessContext* 
context, core::ProcessSess
   gsl_Expects(context);
 
   if (auto max_buffer_age = 
context->getProperty<core::TimePeriodValue>(MaxBufferAge)) {
-    buffer_.setMaxAge(max_buffer_age->getMilliseconds());
+    max_age_ = max_buffer_age->getMilliseconds();
     setTriggerWhenEmpty(true);
     logger_->log_trace("The Buffer maximum age is configured to be %" PRId64 " 
ms", int64_t{max_buffer_age->getMilliseconds().count()});
   }
 
   auto max_buffer_size = 
context->getProperty<core::DataSizeValue>(MaxBufferSize);
   if (max_buffer_size.has_value() && max_buffer_size->getValue() > 0) {
-    buffer_.setMaxSize(max_buffer_size->getValue());
+    max_size_ = max_buffer_size->getValue();
     logger_->log_trace("The Buffer maximum size is configured to be %" PRIu64 
" B", max_buffer_size->getValue());
   }
 
@@ -98,32 +98,40 @@ void DefragmentText::onTrigger(core::ProcessContext*, 
core::ProcessSession* sess
     if (original_flow_file)
       processNextFragment(session, 
gsl::not_null(std::move(original_flow_file)));
   }
-  if (buffer_.maxSizeReached()) {
-    buffer_.flushAndReplace(session, Failure, nullptr);
-    return;
+  for (auto& [fragment_source_id, fragment_source] : fragment_sources_) {
+    if (fragment_source.buffer.maxSizeReached(max_size_)) {
+      fragment_source.buffer.flushAndReplace(session, Failure, nullptr);
+    } else if (fragment_source.buffer.maxAgeReached(max_age_)) {
+      fragment_source.buffer.flushAndReplace(session, pattern_location_ == 
PatternLocation::START_OF_MESSAGE ? Success : Failure, nullptr);
+    }
   }
-  if (buffer_.maxAgeReached()) {
-    if (pattern_location_ == PatternLocation::START_OF_MESSAGE)
-      buffer_.flushAndReplace(session, Success, nullptr);
-    else
-      buffer_.flushAndReplace(session, Failure, nullptr);
+}
+
+namespace {
+std::optional<size_t> getFragmentOffset(const core::FlowFile& flow_file) {
+  if (auto offset_attribute = 
flow_file.getAttribute(textfragmentutils::OFFSET_ATTRIBUTE)) {
+    return std::stoi(*offset_attribute);
   }
+  return std::nullopt;
 }
+}  // namespace
 
 void DefragmentText::processNextFragment(core::ProcessSession *session, const 
gsl::not_null<std::shared_ptr<core::FlowFile>>& next_fragment) {
-  if (!buffer_.isCompatible(*next_fragment)) {
-    buffer_.flushAndReplace(session, Failure, nullptr);
+  auto fragment_source_id = FragmentSource::Id(*next_fragment);
+  auto& fragment_source = fragment_sources_[fragment_source_id];
+  auto& buffer = fragment_source.buffer;
+  if (!buffer.empty() && buffer.getNextFragmentOffset() != 
getFragmentOffset(*next_fragment)) {
+    buffer.flushAndReplace(session, Failure, nullptr);
     session->transfer(next_fragment, Failure);
     return;
   }
   std::shared_ptr<core::FlowFile> split_before_last_pattern;
   std::shared_ptr<core::FlowFile> split_after_last_pattern;
-  bool found_pattern = splitFlowFileAtLastPattern(session, next_fragment, 
split_before_last_pattern,
-                                                  split_after_last_pattern);
+  bool found_pattern = splitFlowFileAtLastPattern(session, next_fragment, 
split_before_last_pattern, split_after_last_pattern);
   if (split_before_last_pattern)
-    buffer_.append(session, 
gsl::not_null(std::move(split_before_last_pattern)));
+    buffer.append(session, 
gsl::not_null(std::move(split_before_last_pattern)));
   if (found_pattern) {
-    buffer_.flushAndReplace(session, Success, split_after_last_pattern);
+    buffer.flushAndReplace(session, Success, split_after_last_pattern);
   }
   session->remove(next_fragment);
 }
@@ -259,24 +267,16 @@ void DefragmentText::Buffer::append(core::ProcessSession* 
session, const gsl::no
   session->remove(flow_file_to_append);
 }
 
-bool DefragmentText::Buffer::maxSizeReached() const {
+bool DefragmentText::Buffer::maxSizeReached(const std::optional<size_t> 
max_size) const {
   return !empty()
-      && max_size_.has_value()
-      && (max_size_.value() < buffered_flow_file_->getSize());
+      && max_size.has_value()
+      && (max_size.value() < buffered_flow_file_->getSize());
 }
 
-bool DefragmentText::Buffer::maxAgeReached() const {
+bool DefragmentText::Buffer::maxAgeReached(const 
std::optional<std::chrono::milliseconds> max_age) const {
   return !empty()
-      && max_age_.has_value()
-      && (creation_time_ + max_age_.value() < 
std::chrono::steady_clock::now());
-}
-
-void DefragmentText::Buffer::setMaxAge(std::chrono::milliseconds max_age) {
-  max_age_ = max_age;
-}
-
-void DefragmentText::Buffer::setMaxSize(size_t max_size) {
-  max_size_ = max_size;
+      && max_age.has_value()
+      && (creation_time_ + max_age.value() < std::chrono::steady_clock::now());
 }
 
 void DefragmentText::Buffer::flushAndReplace(core::ProcessSession* session, 
const core::Relationship& relationship,
@@ -297,32 +297,25 @@ void DefragmentText::Buffer::store(core::ProcessSession* 
session, const std::sha
   }
 }
 
-bool DefragmentText::Buffer::isCompatible(const core::FlowFile& fragment) 
const {
+std::optional<size_t> DefragmentText::Buffer::getNextFragmentOffset() const {
   if (empty())
-    return true;
-  if (buffered_flow_file_->getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)
-      != fragment.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)) {
-    return false;
-  }
-  if (buffered_flow_file_->getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)
-      != fragment.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)) {
-    return false;
-  }
-  std::string current_offset_str, append_offset_str;
-  if (buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, 
current_offset_str)
-      != fragment.getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, 
append_offset_str)) {
-    return false;
-  }
-  if (!current_offset_str.empty() && !append_offset_str.empty()) {
-    size_t current_offset = std::stoi(current_offset_str);
-    size_t append_offset = std::stoi(append_offset_str);
-    if (current_offset + buffered_flow_file_->getSize() != append_offset)
-      return false;
-  }
-  return true;
+    return std::nullopt;
+  if (auto offset_attribute = 
buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE))
+    return std::stoi(*offset_attribute) + buffered_flow_file_->getSize();
+  return std::nullopt;
+}
+
+DefragmentText::FragmentSource::Id::Id(const core::FlowFile& flow_file) {
+  if (auto absolute_path = 
flow_file.getAttribute(core::SpecialFlowAttribute::ABSOLUTE_PATH))
+    absolute_path_ = *absolute_path;
+}
+
+size_t DefragmentText::FragmentSource::Id::hash::operator() (const Id& 
fragment_id) const {
+  return std::hash<std::optional<std::string>>{}(fragment_id.absolute_path_);
 }
 
-REGISTER_RESOURCE(DefragmentText, "DefragmentText splits and merges incoming 
flowfiles so cohesive messages are not split between them");
+REGISTER_RESOURCE(DefragmentText, "DefragmentText splits and merges incoming 
flowfiles so cohesive messages are not split between them. "
+                                  "It can handle multiple inputs 
differentiated by the absolute.path flow file attribute.");
 
 
 }  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/DefragmentText.h 
b/extensions/standard-processors/processors/DefragmentText.h
index ee2b2e7..a508c73 100644
--- a/extensions/standard-processors/processors/DefragmentText.h
+++ b/extensions/standard-processors/processors/DefragmentText.h
@@ -21,6 +21,7 @@
 #include <memory>
 #include <string>
 #include <set>
+#include <unordered_map>
 
 #include "core/Processor.h"
 #include "core/FlowFileStore.h"
@@ -64,32 +65,46 @@ class DefragmentText : public core::Processor {
  protected:
   class Buffer {
    public:
-    bool isCompatible(const core::FlowFile& fragment) const;
     void append(core::ProcessSession* session, const 
gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file_to_append);
-    bool maxSizeReached() const;
-    bool maxAgeReached() const;
-    void setMaxAge(std::chrono::milliseconds max_age);
-    void setMaxSize(size_t max_size);
+    bool maxSizeReached(const std::optional<size_t> max_size) const;
+    bool maxAgeReached(const std::optional<std::chrono::milliseconds> max_age) 
const;
     void flushAndReplace(core::ProcessSession* session, const 
core::Relationship& relationship,
                          const std::shared_ptr<core::FlowFile>& 
new_buffered_flow_file);
 
     bool empty() const { return buffered_flow_file_ == nullptr; }
+    std::optional<size_t> getNextFragmentOffset() const;
 
    private:
     void store(core::ProcessSession* session, const 
std::shared_ptr<core::FlowFile>& new_buffered_flow_file);
 
     std::shared_ptr<core::FlowFile> buffered_flow_file_;
     std::chrono::steady_clock::time_point creation_time_;
-    std::optional<std::chrono::milliseconds> max_age_;
-    std::optional<size_t> max_size_;
   };
 
+  struct FragmentSource {
+    class Id {
+     public:
+      explicit Id(const core::FlowFile& flow_file);
+      struct hash {
+        size_t operator()(const Id& fragment_id) const;
+      };
+      bool operator==(const Id& rhs) const = default;
+     protected:
+      std::optional<std::string> absolute_path_;
+    };
+
+    Buffer buffer;
+  };
+
+
   std::regex pattern_;
   PatternLocation pattern_location_;
+  std::optional<std::chrono::milliseconds> max_age_;
+  std::optional<size_t> max_size_;
 
   std::shared_ptr<core::logging::Logger> logger_ = 
core::logging::LoggerFactory<DefragmentText>::getLogger();
   core::FlowFileStore flow_file_store_;
-  Buffer buffer_;
+  std::unordered_map<FragmentSource::Id, FragmentSource, 
FragmentSource::Id::hash> fragment_sources_;
 
   void processNextFragment(core::ProcessSession *session, const 
gsl::not_null<std::shared_ptr<core::FlowFile>>& next_fragment);
 
diff --git a/extensions/standard-processors/tests/unit/DefragmentTextTests.cpp 
b/extensions/standard-processors/tests/unit/DefragmentTextTests.cpp
index 9cd32de..f312ee4 100644
--- a/extensions/standard-processors/tests/unit/DefragmentTextTests.cpp
+++ b/extensions/standard-processors/tests/unit/DefragmentTextTests.cpp
@@ -22,15 +22,14 @@
 #include "UpdateAttribute.h"
 #include "DefragmentText.h"
 #include "TextFragmentUtils.h"
-#include "utils/TestUtils.h"
 #include "serialization/PayloadSerializer.h"
 #include "serialization/FlowFileSerializer.h"
-#include "unit/ContentRepositoryDependentTests.h"
 
 using WriteToFlowFileTestProcessor = 
org::apache::nifi::minifi::processors::WriteToFlowFileTestProcessor;
 using ReadFromFlowFileTestProcessor = 
org::apache::nifi::minifi::processors::ReadFromFlowFileTestProcessor;
 using UpdateAttribute = org::apache::nifi::minifi::processors::UpdateAttribute;
 using DefragmentText = org::apache::nifi::minifi::processors::DefragmentText;
+namespace textfragmentutils = 
org::apache::nifi::minifi::processors::textfragmentutils;
 
 TEST_CASE("DefragmentText Single source tests", 
"[defragmenttextsinglesource]") {
   TestController testController;
@@ -217,32 +216,213 @@ TEST_CASE("DefragmentText Single source tests", 
"[defragmenttextsinglesource]")
   }
 }
 
-TEST_CASE("DefragmentTextInvalidSources", "[defragmenttextinvalidsources]") {
+TEST_CASE("DefragmentTextMultipleSources", "[defragmenttextinvalidsources]") {
   TestController testController;
   auto plan = testController.createPlan();
-  auto write_to_flow_file = 
std::dynamic_pointer_cast<WriteToFlowFileTestProcessor>(plan->addProcessor("WriteToFlowFileTestProcessor",
 "write_to_flow_file"));
-  auto update_ff = 
std::dynamic_pointer_cast<UpdateAttribute>(plan->addProcessor("UpdateAttribute",
 "update_attribute"));
-  auto defrag_text_flow_files =  
std::dynamic_pointer_cast<DefragmentText>(plan->addProcessor("DefragmentText", 
"defrag_text_flow_files"));
+  auto input_1 = 
std::dynamic_pointer_cast<WriteToFlowFileTestProcessor>(plan->addProcessor("WriteToFlowFileTestProcessor",
 "input_1"));
+  auto input_2 = 
std::dynamic_pointer_cast<WriteToFlowFileTestProcessor>(plan->addProcessor("WriteToFlowFileTestProcessor",
 "input_2"));
+  auto update_ff_1 = 
std::dynamic_pointer_cast<UpdateAttribute>(plan->addProcessor("UpdateAttribute",
 "update_attribute_1"));
+  auto update_ff_2 = 
std::dynamic_pointer_cast<UpdateAttribute>(plan->addProcessor("UpdateAttribute",
 "update_attribute_2"));
+  auto defrag_text_flow_files = 
std::dynamic_pointer_cast<DefragmentText>(plan->addProcessor("DefragmentText", 
"defrag_text_flow_files"));
   auto read_from_failure_relationship = 
std::dynamic_pointer_cast<ReadFromFlowFileTestProcessor>(plan->addProcessor("ReadFromFlowFileTestProcessor",
 "read_from_failure_relationship"));
+  auto read_from_success_relationship = 
std::dynamic_pointer_cast<ReadFromFlowFileTestProcessor>(plan->addProcessor("ReadFromFlowFileTestProcessor",
 "read_from_success_relationship"));
 
-  plan->addConnection(write_to_flow_file, 
WriteToFlowFileTestProcessor::Success, update_ff);
-  plan->addConnection(update_ff, UpdateAttribute ::Success, 
defrag_text_flow_files);
+  plan->addConnection(input_1, WriteToFlowFileTestProcessor::Success, 
update_ff_1);
+  plan->addConnection(input_2, WriteToFlowFileTestProcessor::Success, 
update_ff_2);
+  plan->addConnection(update_ff_1, UpdateAttribute::Success, 
defrag_text_flow_files);
+  plan->addConnection(update_ff_2, UpdateAttribute::Success, 
defrag_text_flow_files);
 
   plan->addConnection(defrag_text_flow_files, DefragmentText::Failure, 
read_from_failure_relationship);
-  
defrag_text_flow_files->setAutoTerminatedRelationships({DefragmentText::Success});
+  plan->addConnection(defrag_text_flow_files, DefragmentText::Success, 
read_from_success_relationship);
 
+  read_from_failure_relationship->disableClearOnTrigger();
+  read_from_success_relationship->disableClearOnTrigger();
   
read_from_failure_relationship->setAutoTerminatedRelationships({ReadFromFlowFileTestProcessor::Success});
+  
read_from_success_relationship->setAutoTerminatedRelationships({ReadFromFlowFileTestProcessor::Success});
+  plan->setProperty(defrag_text_flow_files, DefragmentText::Pattern.getName(), 
"%");
+
+  SECTION("Multiple Sources with different fragment attributes") {
+    plan->setProperty(update_ff_1, core::SpecialFlowAttribute::ABSOLUTE_PATH, 
"input_1", true);
+    plan->setProperty(update_ff_2, core::SpecialFlowAttribute::ABSOLUTE_PATH, 
"input_2", true);
 
-  plan->setProperty(defrag_text_flow_files, DefragmentText::Pattern.getName(), 
"<[0-9]+>");
-  plan->setProperty(update_ff, 
org::apache::nifi::minifi::processors::textfragmentutils::BASE_NAME_ATTRIBUTE, 
"${UUID()}", true);
-
-  write_to_flow_file->setContent("Foo <1> Foo");
-  testController.runSession(plan);
-  CHECK(read_from_failure_relationship->numberOfFlowFilesRead() == 0);
-  write_to_flow_file->setContent("Bar <2> Bar");
-  plan->reset();
-  testController.runSession(plan);
-  CHECK(read_from_failure_relationship->numberOfFlowFilesRead() == 2);
-  CHECK(read_from_failure_relationship->readFlowFileWithContent("<1> Foo"));
-  CHECK(read_from_failure_relationship->readFlowFileWithContent("Bar <2> 
Bar"));
+    input_1->setContent("abc%def");
+    input_2->setContent("ABC%DEF");
+    testController.runSession(plan);
+    plan->reset();
+    input_1->clearContent();
+    input_2->clearContent();
+    testController.runSession(plan);
+
+    CHECK(read_from_failure_relationship->numberOfFlowFilesRead() == 0);
+    CHECK(read_from_success_relationship->numberOfFlowFilesRead() == 2);
+    CHECK(read_from_success_relationship->readFlowFileWithContent("abc"));
+    CHECK(read_from_success_relationship->readFlowFileWithContent("ABC"));
+
+    plan->reset();
+    input_1->setContent("ghi%jkl");
+    input_2->setContent("GHI%JKL");
+    testController.runSession(plan);
+    plan->reset();
+    input_1->clearContent();
+    input_2->clearContent();
+    testController.runSession(plan);
+
+    CHECK(read_from_failure_relationship->numberOfFlowFilesRead() == 0);
+    CHECK(read_from_success_relationship->numberOfFlowFilesRead() == 4);
+    CHECK(read_from_success_relationship->readFlowFileWithContent("%defghi"));
+    CHECK(read_from_success_relationship->readFlowFileWithContent("%DEFGHI"));
+  }
+
+  SECTION("Multiple Sources with same fragment attributes mix up") {
+    plan->setProperty(update_ff_1, core::SpecialFlowAttribute::ABSOLUTE_PATH, 
"input", true);
+    plan->setProperty(update_ff_2, core::SpecialFlowAttribute::ABSOLUTE_PATH, 
"input", true);
+
+    input_1->setContent("abc%def");
+    input_2->setContent("ABC%DEF");
+    testController.runSession(plan);
+    plan->reset();
+    input_1->clearContent();
+    input_2->clearContent();
+    testController.runSession(plan);
+
+    CHECK(read_from_failure_relationship->numberOfFlowFilesRead() == 0);
+    CHECK(read_from_success_relationship->numberOfFlowFilesRead() == 2);
+    CHECK((read_from_success_relationship->readFlowFileWithContent("abc") || 
read_from_success_relationship->readFlowFileWithContent("ABC")));
+    CHECK((read_from_success_relationship->readFlowFileWithContent("%DEFabc") 
|| read_from_success_relationship->readFlowFileWithContent("%defABC")));
+
+    plan->reset();
+    input_1->setContent("ghi%jkl");
+    input_2->setContent("GHI%JKL");
+    testController.runSession(plan);
+    plan->reset();
+    input_1->clearContent();
+    input_2->clearContent();
+    testController.runSession(plan);
+
+    CHECK(read_from_failure_relationship->numberOfFlowFilesRead() == 0);
+    CHECK(read_from_success_relationship->numberOfFlowFilesRead() == 4);
+    CHECK((read_from_success_relationship->readFlowFileWithContent("%defghi")
+        || read_from_success_relationship->readFlowFileWithContent("%defGHI")
+        || read_from_success_relationship->readFlowFileWithContent("%DEFGHI")
+        || 
read_from_success_relationship->readFlowFileWithContent("%DEFghi")));
+  }
+}
+
+class FragmentGenerator : public core::Processor {
+ public:
+  static inline const core::Relationship Success = 
core::Relationship("success", "success operational on the flow record");
+  explicit FragmentGenerator(const std::string& name, const utils::Identifier& 
uuid = utils::Identifier())
+      : Processor(name, uuid) {
+  }
+
+  void onTrigger(core::ProcessContext*, core::ProcessSession* session) 
override {
+    std::vector<core::FlowFile> flow_files;
+    for (const size_t max_i = i_ + batch_size_; i_ < fragment_contents_.size() 
&& i_ < max_i; ++i_) {
+      auto& fragment_content = fragment_contents_[i_];
+      WriteCallback callback(fragment_content);
+      std::shared_ptr<core::FlowFile> flow_file = session->create();
+      if (base_name_attribute_)
+        flow_file->addAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE, 
*base_name_attribute_);
+      if (post_name_attribute_)
+        flow_file->addAttribute(textfragmentutils::POST_NAME_ATTRIBUTE, 
*post_name_attribute_);
+      if (absolute_path_attribute_)
+        flow_file->addAttribute(core::SpecialFlowAttribute::ABSOLUTE_PATH, 
*absolute_path_attribute_);
+      flow_file->addAttribute(textfragmentutils::OFFSET_ATTRIBUTE, 
std::to_string(offset_));
+      offset_ += fragment_content.size();
+      session->write(flow_file, &callback);
+      session->transfer(flow_file, Success);
+    }
+  }
+  void initialize() override { setSupportedRelationships({Success});}
+
+
+  void setFragments(std::vector<std::string>&& fragments) {fragment_contents_ 
= std::move(fragments);}
+  void setBatchSize(const size_t batch_size) {batch_size_ = batch_size;}
+  void setAbsolutePathAttribute(const std::string& absolute_path_attribute) { 
absolute_path_attribute_ = absolute_path_attribute; }
+  void setBaseNameAttribute(const std::string& base_name_attribute) { 
base_name_attribute_ = base_name_attribute; }
+  void setPostNameAttribute(const std::string& post_name_attribute) { 
post_name_attribute_ = post_name_attribute; }
+  void clearAbsolutePathAttribute() { absolute_path_attribute_.reset(); }
+  void clearPostNameAttribute() { post_name_attribute_.reset(); }
+  void clearBaseNameAttribute() { base_name_attribute_.reset(); }
+
+ protected:
+  struct WriteCallback : public 
org::apache::nifi::minifi::OutputStreamCallback {
+    const gsl::span<const uint8_t> content_;
+
+    explicit WriteCallback(const std::string& content) : 
content_(reinterpret_cast<const uint8_t*>(content.data()), content.size()) {}
+
+    int64_t process(const 
std::shared_ptr<org::apache::nifi::minifi::io::BaseStream> &stream) override {
+      size_t bytes_written = stream->write(content_.begin(), content_.size());
+      return org::apache::nifi::minifi::io::isError(bytes_written) ? -1 : 
gsl::narrow<int64_t>(bytes_written);
+    }
+  };
+
+  size_t offset_ = 0;
+  size_t batch_size_ = 1;
+  size_t i_ = 0;
+  std::optional<std::string> absolute_path_attribute_;
+  std::optional<std::string> base_name_attribute_;
+  std::optional<std::string> post_name_attribute_;
+  std::vector<std::string> fragment_contents_;
+};
+
+REGISTER_RESOURCE(FragmentGenerator, "FragmentGenerator (only for testing 
purposes)");
+
+TEST_CASE("DefragmentText with offset attributes", 
"[defragmenttextoffsetattributes]") {
+  TestController testController;
+  auto plan = testController.createPlan();
+  auto input_1 = 
std::dynamic_pointer_cast<FragmentGenerator>(plan->addProcessor("FragmentGenerator",
 "input_1"));
+  auto input_2 = 
std::dynamic_pointer_cast<FragmentGenerator>(plan->addProcessor("FragmentGenerator",
 "input_2"));
+
+  auto defrag_text_flow_files = 
std::dynamic_pointer_cast<DefragmentText>(plan->addProcessor("DefragmentText", 
"defrag_text_flow_files"));
+  auto read_from_failure_relationship = 
std::dynamic_pointer_cast<ReadFromFlowFileTestProcessor>(plan->addProcessor("ReadFromFlowFileTestProcessor",
 "read_from_failure_relationship"));
+  auto read_from_success_relationship = 
std::dynamic_pointer_cast<ReadFromFlowFileTestProcessor>(plan->addProcessor("ReadFromFlowFileTestProcessor",
 "read_from_success_relationship"));
+
+  plan->addConnection(input_1, FragmentGenerator::Success, 
defrag_text_flow_files);
+  plan->addConnection(input_2, FragmentGenerator::Success, 
defrag_text_flow_files);
+
+  plan->addConnection(defrag_text_flow_files, DefragmentText::Failure, 
read_from_failure_relationship);
+  plan->addConnection(defrag_text_flow_files, DefragmentText::Success, 
read_from_success_relationship);
+
+  read_from_failure_relationship->disableClearOnTrigger();
+  read_from_success_relationship->disableClearOnTrigger();
+  
read_from_failure_relationship->setAutoTerminatedRelationships({ReadFromFlowFileTestProcessor::Success});
+  
read_from_success_relationship->setAutoTerminatedRelationships({ReadFromFlowFileTestProcessor::Success});
+  plan->setProperty(defrag_text_flow_files, DefragmentText::Pattern.getName(), 
"%");
+  input_1->setBaseNameAttribute("input_1");
+  input_2->setBaseNameAttribute("input_2");
+  input_1->setPostNameAttribute("log");
+  input_2->setPostNameAttribute("log");
+  input_1->setAbsolutePathAttribute("/tmp/input/input_1.log");
+  input_2->setAbsolutePathAttribute("/tmp/input/input_2.log");
+
+  SECTION("Single source input with offsets") {
+    input_1->setFragments({"foo%bar", "%baz,app", "le%"});
+    for (size_t i=0; i < 10; ++i) {
+      testController.runSession(plan);
+      plan->reset();
+    }
+    CHECK(read_from_failure_relationship->numberOfFlowFilesRead() == 0);
+    CHECK(read_from_success_relationship->numberOfFlowFilesRead() == 3);
+    CHECK(read_from_success_relationship->readFlowFileWithContent("foo"));
+    CHECK(read_from_success_relationship->readFlowFileWithContent("%bar"));
+    
CHECK(read_from_success_relationship->readFlowFileWithContent("%baz,apple"));
+  }
+
+  SECTION("Two input sources with offsets") {
+    input_1->setFragments({"foo%bar", "%baz,app", "le%"});
+    input_2->setFragments({"monkey%dog", "%cat,octopu", "s%"});
+    for (size_t i=0; i < 10; ++i) {
+      testController.runSession(plan);
+      plan->reset();
+    }
+    CHECK(read_from_failure_relationship->numberOfFlowFilesRead() == 0);
+    CHECK(read_from_success_relationship->numberOfFlowFilesRead() == 6);
+    CHECK(read_from_success_relationship->readFlowFileWithContent("foo"));
+    CHECK(read_from_success_relationship->readFlowFileWithContent("%bar"));
+    
CHECK(read_from_success_relationship->readFlowFileWithContent("%baz,apple"));
+    CHECK(read_from_success_relationship->readFlowFileWithContent("monkey"));
+    CHECK(read_from_success_relationship->readFlowFileWithContent("%dog"));
+    
CHECK(read_from_success_relationship->readFlowFileWithContent("%cat,octopus"));
+  }
 }
diff --git a/libminifi/test/ReadFromFlowFileTestProcessor.cpp 
b/libminifi/test/ReadFromFlowFileTestProcessor.cpp
index c33bdb2..1fdfef9 100644
--- a/libminifi/test/ReadFromFlowFileTestProcessor.cpp
+++ b/libminifi/test/ReadFromFlowFileTestProcessor.cpp
@@ -48,7 +48,8 @@ struct ReadFlowFileIntoBuffer : public InputStreamCallback {
 void ReadFromFlowFileTestProcessor::onTrigger(core::ProcessContext* context, 
core::ProcessSession* session) {
   gsl_Expects(context && session);
   logger_->log_info("%s", ON_TRIGGER_LOG_STR);
-  flow_files_read_.clear();
+  if (clear_on_trigger_)
+    clear();
 
   while (std::shared_ptr<core::FlowFile> flow_file = session->get()) {
     ReadFlowFileIntoBuffer callback;
diff --git a/libminifi/test/ReadFromFlowFileTestProcessor.h 
b/libminifi/test/ReadFromFlowFileTestProcessor.h
index 790478e..c8db070 100644
--- a/libminifi/test/ReadFromFlowFileTestProcessor.h
+++ b/libminifi/test/ReadFromFlowFileTestProcessor.h
@@ -58,12 +58,25 @@ class ReadFromFlowFileTestProcessor : public 
core::Processor {
     return flow_files_read_.size();
   }
 
+  void enableClearOnTrigger() {
+    clear_on_trigger_ = true;
+  }
+
+  void disableClearOnTrigger() {
+    clear_on_trigger_ = false;
+  }
+
+  void clear() {
+    flow_files_read_.clear();
+  }
+
  private:
   struct FlowFileData {
     FlowFileData(core::ProcessSession* session, const 
gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file);
     std::string content_;
     std::map<std::string, std::string> attributes_;
   };
+  bool clear_on_trigger_ = true;
   std::shared_ptr<core::logging::Logger> logger_ = 
core::logging::LoggerFactory<ReadFromFlowFileTestProcessor>::getLogger();
   std::vector<FlowFileData> flow_files_read_;
 };
diff --git a/libminifi/test/WriteToFlowFileTestProcessor.h 
b/libminifi/test/WriteToFlowFileTestProcessor.h
index 6d05221..787c0d0 100644
--- a/libminifi/test/WriteToFlowFileTestProcessor.h
+++ b/libminifi/test/WriteToFlowFileTestProcessor.h
@@ -53,6 +53,10 @@ class WriteToFlowFileTestProcessor : public core::Processor {
     content_ = std::move(content);
   }
 
+  void clearContent() {
+    content_.clear();
+  }
+
  private:
   std::shared_ptr<core::logging::Logger> logger_ = 
core::logging::LoggerFactory<WriteToFlowFileTestProcessor>::getLogger();
   std::string content_;

Reply via email to