lordgamez commented on code in PR #1682:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1682#discussion_r1395492616


##########
extensions/standard-processors/tests/unit/SplitTextTests.cpp:
##########
@@ -0,0 +1,860 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FlowFileRecord.h"
+#include "TestBase.h"
+#include "Catch.h"
+#include "processors/SplitText.h"
+#include "SingleProcessorTestController.h"
+#include "io/BufferStream.h"
+
+namespace org::apache::nifi::minifi::test {
+
+TEST_CASE("Test LineReader with nullptr") {
+  processors::detail::LineReader reader{nullptr};
+  CHECK(reader.readNextLine() == std::nullopt);
+  CHECK(reader.getState() == processors::detail::StreamReadState::EndOfStream);
+}
+
+TEST_CASE("Test LineReader with empty stream") {
+  auto stream = std::make_shared<io::BufferStream>();
+  processors::detail::LineReader reader{nullptr};
+  CHECK(reader.readNextLine() == std::nullopt);
+  CHECK(reader.getState() == processors::detail::StreamReadState::EndOfStream);
+}
+
+TEST_CASE("Test LineReader with trailing endline") {
+  auto stream = std::make_shared<io::BufferStream>();
+  std::string input = "this is a new line\nand another line\r\nthirdline\n";
+  stream->write(reinterpret_cast<const uint8_t*>(input.data()), input.size());
+  processors::detail::LineReader reader{stream};
+  CHECK(*reader.readNextLine() == 
processors::detail::LineReader::LineInfo{.offset = 0, .size = 19, .endline_size 
= 1});
+  CHECK(*reader.readNextLine() == 
processors::detail::LineReader::LineInfo{.offset = 19, .size = 18, 
.endline_size = 2});
+  CHECK(*reader.readNextLine() == 
processors::detail::LineReader::LineInfo{.offset = 37, .size = 10, 
.endline_size = 1});
+  CHECK(reader.readNextLine() == std::nullopt);
+  CHECK(reader.getState() == processors::detail::StreamReadState::EndOfStream);
+}
+
+TEST_CASE("Test LineReader without trailing endlines") {
+  auto stream = std::make_shared<io::BufferStream>();
+  std::string input = "this is a new line\nand another line\r\nthirdline";
+  stream->write(reinterpret_cast<const uint8_t*>(input.data()), input.size());
+  processors::detail::LineReader reader{stream};
+  CHECK(*reader.readNextLine() == 
processors::detail::LineReader::LineInfo{.offset = 0, .size = 19, .endline_size 
= 1});
+  CHECK(*reader.readNextLine() == 
processors::detail::LineReader::LineInfo{.offset = 19, .size = 18, 
.endline_size = 2});
+  CHECK(*reader.readNextLine() == 
processors::detail::LineReader::LineInfo{.offset = 37, .size = 9, .endline_size 
= 0});
+  CHECK(reader.readNextLine() == std::nullopt);
+  CHECK(reader.getState() == processors::detail::StreamReadState::EndOfStream);
+}
+
+TEST_CASE("Test LineReader with input larger than buffer length") {
+  auto stream = std::make_shared<io::BufferStream>();
+  const auto first_line_size = 
static_cast<size_t>(processors::detail::SPLIT_TEXT_BUFFER_SIZE * 1.5);
+  const auto second_line_size = 
static_cast<size_t>(processors::detail::SPLIT_TEXT_BUFFER_SIZE * 1.7);
+  std::string input = std::string(first_line_size, 'a') + "\n" + 
std::string(second_line_size, 'b') + "\n";
+  stream->write(reinterpret_cast<const uint8_t*>(input.data()), input.size());
+  processors::detail::LineReader reader{stream};
+  CHECK(*reader.readNextLine() == 
processors::detail::LineReader::LineInfo{.offset = 0, .size = first_line_size + 
1, .endline_size = 1});
+  CHECK(*reader.readNextLine() == 
processors::detail::LineReader::LineInfo{.offset = first_line_size +1 , .size = 
second_line_size + 1, .endline_size = 1});
+  CHECK(reader.readNextLine() == std::nullopt);
+  CHECK(reader.getState() == processors::detail::StreamReadState::EndOfStream);
+}
+
+TEST_CASE("Test LineReader with input of same size as buffer length") {
+  auto stream = std::make_shared<io::BufferStream>();
+  std::string input = std::string(processors::detail::SPLIT_TEXT_BUFFER_SIZE - 
1, 'a') + "\n" + std::string(processors::detail::SPLIT_TEXT_BUFFER_SIZE * 2 - 
1, 'b') + "\n";
+  stream->write(reinterpret_cast<const uint8_t*>(input.data()), input.size());
+  processors::detail::LineReader reader{stream};
+  CHECK(*reader.readNextLine() == 
processors::detail::LineReader::LineInfo{.offset = 0, .size = 
processors::detail::SPLIT_TEXT_BUFFER_SIZE, .endline_size = 1});
+  CHECK(*reader.readNextLine() ==
+    processors::detail::LineReader::LineInfo{.offset = 
processors::detail::SPLIT_TEXT_BUFFER_SIZE, .size = 
processors::detail::SPLIT_TEXT_BUFFER_SIZE * 2, .endline_size = 1});
+  CHECK(reader.readNextLine() == std::nullopt);
+  CHECK(reader.getState() == processors::detail::StreamReadState::EndOfStream);
+}
+
+TEST_CASE("Test LineReader with input larger than buffer length without 
trailing endline") {
+  auto stream = std::make_shared<io::BufferStream>();
+  const auto first_line_size = 
static_cast<size_t>(processors::detail::SPLIT_TEXT_BUFFER_SIZE * 1.5);
+  const auto second_line_size = 
static_cast<size_t>(processors::detail::SPLIT_TEXT_BUFFER_SIZE * 1.7);
+  std::string input = std::string(first_line_size, 'a') + "\n" + 
std::string(second_line_size, 'b');
+  stream->write(reinterpret_cast<const uint8_t*>(input.data()), input.size());
+  processors::detail::LineReader reader{stream};
+  CHECK(*reader.readNextLine() == 
processors::detail::LineReader::LineInfo{.offset = 0, .size = first_line_size + 
1, .endline_size = 1});
+  CHECK(*reader.readNextLine() == 
processors::detail::LineReader::LineInfo{.offset = first_line_size + 1, .size = 
second_line_size, .endline_size = 0});
+  CHECK(reader.readNextLine() == std::nullopt);
+  CHECK(reader.getState() == processors::detail::StreamReadState::EndOfStream);
+}
+
+TEST_CASE("Test LineReader with input of same size as buffer length without 
trailing endline") {
+  auto stream = std::make_shared<io::BufferStream>();
+  std::string input = std::string(processors::detail::SPLIT_TEXT_BUFFER_SIZE - 
1, 'a') + "\n" + std::string(processors::detail::SPLIT_TEXT_BUFFER_SIZE * 2, 
'b');
+  stream->write(reinterpret_cast<const uint8_t*>(input.data()), input.size());
+  processors::detail::LineReader reader{stream};
+  CHECK(*reader.readNextLine() == 
processors::detail::LineReader::LineInfo{.offset = 0, .size = 
processors::detail::SPLIT_TEXT_BUFFER_SIZE, .endline_size = 1});
+  CHECK(*reader.readNextLine() ==
+    processors::detail::LineReader::LineInfo{.offset = 
processors::detail::SPLIT_TEXT_BUFFER_SIZE, .size = 
processors::detail::SPLIT_TEXT_BUFFER_SIZE * 2, .endline_size = 0});
+  CHECK(reader.readNextLine() == std::nullopt);
+  CHECK(reader.getState() == processors::detail::StreamReadState::EndOfStream);
+}
+
+TEST_CASE("Test LineReader with starts with filter") {
+  auto stream = std::make_shared<io::BufferStream>();
+  std::string input = "header this is a new line\nheader and another 
line\r\nthirdline\nheader line\n";
+  stream->write(reinterpret_cast<const uint8_t*>(input.data()), input.size());
+  processors::detail::LineReader reader{stream};
+  CHECK(*reader.readNextLine("header") == 
processors::detail::LineReader::LineInfo{.offset = 0, .size = 26, .endline_size 
= 1, .matches_starts_with = true});
+  CHECK(*reader.readNextLine("header") == 
processors::detail::LineReader::LineInfo{.offset = 26, .size = 25, 
.endline_size = 2, .matches_starts_with = true});
+  CHECK(*reader.readNextLine("header") == 
processors::detail::LineReader::LineInfo{.offset = 51, .size = 10, 
.endline_size = 1, .matches_starts_with = false});
+  CHECK(*reader.readNextLine("header") == 
processors::detail::LineReader::LineInfo{.offset = 61, .size = 12, 
.endline_size = 1, .matches_starts_with = true});
+  CHECK(reader.readNextLine() == std::nullopt);
+  CHECK(reader.getState() == processors::detail::StreamReadState::EndOfStream);
+}
+
+struct ExpectedSplitTextResult {
+  std::string content;
+  uint64_t fragment_index = 0;
+  uint64_t fragment_count = 0;
+  uint64_t text_line_count = 0;
+};
+
+struct SplitTextProperties {
+  uint64_t line_split_count = 0;
+  std::optional<bool> trim_trailing_newlines;
+  std::optional<uint64_t> maximum_fragment_size;
+  std::optional<uint64_t> header_line_count;
+  std::optional<std::string> header_line_marker_characters;
+};
+
+void verifySplitResults(const SingleProcessorTestController& controller, const 
ProcessorTriggerResult& trigger_results, const 
std::vector<ExpectedSplitTextResult>& expected_results) {
+  REQUIRE(trigger_results.at(processors::SplitText::Splits).size() == 
expected_results.size());
+  std::string identifier;
+  for (size_t i = 0; i < expected_results.size(); ++i) {
+    
CHECK(controller.plan->getContent(trigger_results.at(processors::SplitText::Splits)[i])
 == expected_results[i].content);
+    
CHECK(trigger_results.at(processors::SplitText::Splits)[i]->getAttribute(processors::SplitText::TextLineCountOutputAttribute.name)
 == std::to_string(expected_results[i].text_line_count));
+    
CHECK(trigger_results.at(processors::SplitText::Splits)[i]->getAttribute(processors::SplitText::FragmentSizeOutputAttribute.name)
 == std::to_string(expected_results[i].content.size()));
+    if (i > 0) {
+      
CHECK(trigger_results.at(processors::SplitText::Splits)[i]->getAttribute(processors::SplitText::FragmentIdentifierOutputAttribute.name).value()
 == identifier);
+    } else {
+      identifier = 
trigger_results.at(processors::SplitText::Splits)[i]->getAttribute(processors::SplitText::FragmentIdentifierOutputAttribute.name).value();
+      CHECK(!identifier.empty());
+    }
+    
CHECK(trigger_results.at(processors::SplitText::Splits)[i]->getAttribute(core::SpecialFlowAttribute::FILENAME)
 ==
+      "a.foo.fragment." + identifier + "." + 
std::to_string(expected_results[i].fragment_index));
+    
CHECK(trigger_results.at(processors::SplitText::Splits)[i]->getAttribute(processors::SplitText::FragmentIndexOutputAttribute.name)
 == std::to_string(expected_results[i].fragment_index));
+    
CHECK(trigger_results.at(processors::SplitText::Splits)[i]->getAttribute(processors::SplitText::FragmentCountOutputAttribute.name)
 == std::to_string(expected_results[i].fragment_count));
+    
CHECK(trigger_results.at(processors::SplitText::Splits)[i]->getAttribute(processors::SplitText::SegmentOriginalFilenameOutputAttribute.name)
 == "a.foo");
+  }
+}
+
+void runSplitTextTest(const std::string& input, const 
std::vector<ExpectedSplitTextResult>& expected_results, const 
SplitTextProperties& properties) {
+  const auto split_text = std::make_shared<processors::SplitText>("SplitText");
+  SingleProcessorTestController controller{split_text};
+  split_text->setProperty(processors::SplitText::LineSplitCount, 
std::to_string(properties.line_split_count));
+  if (properties.maximum_fragment_size) {
+    split_text->setProperty(processors::SplitText::MaximumFragmentSize, 
std::to_string(*properties.maximum_fragment_size) + " B");
+  }
+  if (properties.trim_trailing_newlines) {
+    split_text->setProperty(processors::SplitText::RemoveTrailingNewlines, 
properties.trim_trailing_newlines.value() ? "true" : "false");
+  }
+  if (properties.header_line_count) {
+    split_text->setProperty(processors::SplitText::HeaderLineCount, 
std::to_string(*properties.header_line_count));
+  }
+  if (properties.header_line_marker_characters) {
+    split_text->setProperty(processors::SplitText::HeaderLineMarkerCharacters, 
*properties.header_line_marker_characters);
+  }
+  const auto trigger_results = controller.trigger(input, 
{{std::string(core::SpecialFlowAttribute::FILENAME), "a.foo"}});
+  CHECK(trigger_results.at(processors::SplitText::Failure).empty());
+  CHECK(trigger_results.at(processors::SplitText::Original).size() == 1);
+  
CHECK(trigger_results.at(processors::SplitText::Original)[0]->getAttribute(core::SpecialFlowAttribute::FILENAME)
 == "a.foo");
+  
CHECK(controller.plan->getContent(trigger_results.at(processors::SplitText::Original)[0])
 == input);
+  verifySplitResults(controller, trigger_results, expected_results);
+}
+
+TEST_CASE("Line Split Count property is required") {
+  const auto split_text = std::make_shared<processors::SplitText>("SplitText");
+  SingleProcessorTestController controller{split_text};
+  REQUIRE_THROWS_AS(controller.trigger("", {}), minifi::Exception);
+}
+
+TEST_CASE("Line Split Count property can only be 0 if Maximum Fragment Size is 
set") {
+  const auto split_text = std::make_shared<processors::SplitText>("SplitText");
+  SingleProcessorTestController controller{split_text};
+  split_text->setProperty(processors::SplitText::LineSplitCount, "0");
+  REQUIRE_THROWS_AS(controller.trigger("", {}), minifi::Exception);
+}
+
+TEST_CASE("Maximum Fragment Size cannot be set to 0") {
+  const auto split_text = std::make_shared<processors::SplitText>("SplitText");
+  SingleProcessorTestController controller{split_text};
+  split_text->setProperty(processors::SplitText::LineSplitCount, "0");
+  split_text->setProperty(processors::SplitText::MaximumFragmentSize, "0 B");
+  REQUIRE_THROWS_AS(controller.trigger("", {}), minifi::Exception);
+}
+
+TEST_CASE("Header Line Marker Characters size cannot be equal or larger than 
split text buffer size") {
+  const auto split_text = std::make_shared<processors::SplitText>("SplitText");
+  SingleProcessorTestController controller{split_text};
+  split_text->setProperty(processors::SplitText::LineSplitCount, "1");
+  std::string 
header_marker_character(static_cast<size_t>(processors::detail::SPLIT_TEXT_BUFFER_SIZE),
 'A');

Review Comment:
   Updated in fccf152353ae7c25a8d4944b8b0e3add15254984



##########
extensions/standard-processors/processors/SplitText.cpp:
##########
@@ -0,0 +1,383 @@
+/**
+ * @file SplitText.cpp
+ * SplitText class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "SplitText.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/Resource.h"
+#include "core/FlowFile.h"
+#include "utils/gsl.h"
+#include "utils/ProcessorConfigUtils.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+namespace detail {
+
+LineReader::LineReader(const std::shared_ptr<io::InputStream>& stream)
+    : stream_(stream) {
+  if (!stream_ || stream_->size() == 0) {
+    state_ = StreamReadState::EndOfStream;
+  }
+}
+
+uint8_t LineReader::getEndLineSize(size_t newline_index) {
+  gsl_Expects(buffer_.size() > newline_index);
+  if (buffer_[newline_index] != '\n') {
+    return 0;
+  }
+  if (newline_index == 0 || buffer_[newline_index - 1] != '\r') {
+    return 1;
+  }
+  return 2;
+}
+
+void LineReader::setLastLineInfoAttributes(uint8_t endline_size, const 
std::optional<std::string>& starts_with) {
+  const uint64_t size_from_beginning_of_stream = (current_buffer_count_ - 1) * 
SPLIT_TEXT_BUFFER_SIZE + buffer_offset_;
+  if (last_line_info_) {
+    LineInfo previous_line_info = *last_line_info_;
+    last_line_info_->offset = previous_line_info.offset + 
previous_line_info.size;
+    last_line_info_->size = size_from_beginning_of_stream - 
previous_line_info.offset - previous_line_info.size;
+    last_line_info_->endline_size = endline_size;
+    last_line_info_->matches_starts_with = true;
+  } else {
+    last_line_info_ = LineInfo{.offset = 0, .size = read_size_ - 
last_read_size_ + buffer_offset_, .endline_size = endline_size, 
.matches_starts_with = true};
+  }
+
+  if (starts_with) {
+    last_line_info_->matches_starts_with = last_line_info_->size >= 
starts_with->size() &&
+      std::equal(starts_with->begin(), starts_with->end(), buffer_.begin() + 
last_line_info_->offset, buffer_.begin() + last_line_info_->offset + 
starts_with->size());
+  }
+}
+
+bool LineReader::readNextBuffer() {
+  buffer_offset_ = 0;
+  last_read_size_ = (std::min)(gsl::narrow<size_t>(stream_->size() - 
read_size_), SPLIT_TEXT_BUFFER_SIZE);
+  const auto read_ret = 
stream_->read(as_writable_bytes(std::span(buffer_).subspan(0, 
last_read_size_)));
+  read_size_ += read_ret;
+  if (io::isError(read_ret)) {
+    state_ = StreamReadState::StreamReadError;
+    return false;
+  }
+  ++current_buffer_count_;
+  return true;
+}
+
+std::optional<LineReader::LineInfo> LineReader::finalizeLineInfo(uint8_t 
endline_size, const std::optional<std::string>& starts_with) {
+  setLastLineInfoAttributes(endline_size, starts_with);
+  if (last_line_info_->size == 0) {
+    return std::nullopt;
+  }
+  return last_line_info_;
+}
+
+std::optional<LineReader::LineInfo> LineReader::readNextLine(const 
std::optional<std::string>& starts_with) {
+  if (state_ != StreamReadState::Ok) {
+    return std::nullopt;
+  }
+
+  const auto isLastReadProcessed = [this]() { return last_read_size_ <= 
buffer_offset_; };
+  while (read_size_ < stream_->size() || !isLastReadProcessed()) {
+    if (isLastReadProcessed() && !readNextBuffer()) {
+      return std::nullopt;
+    }
+
+    for (auto i = buffer_offset_; i < last_read_size_; ++i) {
+      if (buffer_[i] == '\n') {
+        buffer_offset_ = i + 1;
+        return finalizeLineInfo(getEndLineSize(i), starts_with);
+      }
+    }
+    buffer_offset_ = last_read_size_;
+  }
+
+  state_ = StreamReadState::EndOfStream;
+  return finalizeLineInfo(0, starts_with);
+}
+
+SplitTextFragmentGenerator::SplitTextFragmentGenerator(const 
std::shared_ptr<io::InputStream>& stream, const SplitTextConfiguration& 
split_text_config)
+    : line_reader_(stream),
+      split_text_config_(split_text_config) {
+}
+
+void SplitTextFragmentGenerator::finalizeFragmentOffset(Fragment& 
current_fragment) {
+  current_fragment.fragment_offset = flow_file_offset_;
+  flow_file_offset_ += current_fragment.fragment_size;
+}
+
+void SplitTextFragmentGenerator::addLineToFragment(Fragment& current_fragment, 
const LineReader::LineInfo& line) {
+  if (line.endline_size == line.size) {  // if line consists only of endline 
characters, we need to append the fragment trim size
+    current_fragment.endline_size += line.endline_size;
+  } else {
+    current_fragment.endline_size = line.endline_size;
+  }
+  current_fragment.text_line_count += line.endline_size == line.size ? 0 : 1;
+  current_fragment.fragment_size += line.size;
+}
+
+bool SplitTextFragmentGenerator::lineSizeWouldExceedMaxFragmentSize(const 
LineReader::LineInfo& line, uint64_t fragment_size) const {
+  return split_text_config_.maximum_fragment_size && fragment_size + line.size 
+ header_fragment_size_ > split_text_config_.maximum_fragment_size.value();
+}
+
+nonstd::expected<SplitTextFragmentGenerator::Fragment, std::string> 
SplitTextFragmentGenerator::createHeaderFragmentUsingLineCount() {
+  Fragment header_fragment;
+  for (uint64_t i = 0; i < split_text_config_.header_line_count; ++i) {
+    auto line = line_reader_.readNextLine();
+    if (!line) {
+      if (getState() == StreamReadState::EndOfStream) {
+        return nonstd::make_unexpected("The flow file's line count is less 
than the specified header line count!");
+      } else {
+        return nonstd::make_unexpected("Error while reading flow file 
stream!");
+      }
+    }
+    if (lineSizeWouldExceedMaxFragmentSize(*line, 
header_fragment.fragment_size)) {
+      return nonstd::make_unexpected("Header line would exceed the maximum 
fragment size!");
+    }
+
+    addLineToFragment(header_fragment, *line);
+  }
+
+  flow_file_offset_ += header_fragment.fragment_size;
+  header_fragment_size_ = header_fragment.fragment_size;
+  return header_fragment;
+}
+
+nonstd::expected<SplitTextFragmentGenerator::Fragment, std::string> 
SplitTextFragmentGenerator::createHeaderFragmentUsingHeaderMarkerCharacters() {
+  Fragment header_fragment;
+  while (auto line = 
line_reader_.readNextLine(split_text_config_.header_line_marker_characters)) {
+    if (line->size < split_text_config_.header_line_marker_characters->size() 
|| !line->matches_starts_with) {
+      buffered_line_info_ = line;
+      break;
+    }
+    if (lineSizeWouldExceedMaxFragmentSize(*line, 
header_fragment.fragment_size)) {
+      return nonstd::make_unexpected("Header line would exceed the maximum 
fragment size!");
+    }
+
+    addLineToFragment(header_fragment, *line);
+  }
+
+  flow_file_offset_ += header_fragment.fragment_size;
+  header_fragment_size_ = header_fragment.fragment_size;
+  return header_fragment;
+}
+
+nonstd::expected<SplitTextFragmentGenerator::Fragment, std::string> 
SplitTextFragmentGenerator::readHeaderFragment() {
+  gsl_Expects(flow_file_offset_ == 0);
+  if (split_text_config_.header_line_count == 0 && 
!split_text_config_.header_line_marker_characters) {
+    return nonstd::make_unexpected("No header properties were set!");
+  }

Review Comment:
   We cannot check this in `onSchedule` as headers are optional, but actually 
this is checked on line 289, so it is not needed to be checked here, although 
the `readHeaderFragment` should not be called if none of the header properties 
are set. I changed the method to have this desired behavior as part of the 
`gsl_Expects` check in fccf152353ae7c25a8d4944b8b0e3add15254984



##########
extensions/standard-processors/processors/SplitText.cpp:
##########
@@ -0,0 +1,383 @@
+/**
+ * @file SplitText.cpp
+ * SplitText class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "SplitText.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/Resource.h"
+#include "core/FlowFile.h"
+#include "utils/gsl.h"
+#include "utils/ProcessorConfigUtils.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+namespace detail {
+
+LineReader::LineReader(const std::shared_ptr<io::InputStream>& stream)
+    : stream_(stream) {
+  if (!stream_ || stream_->size() == 0) {
+    state_ = StreamReadState::EndOfStream;
+  }
+}
+
+uint8_t LineReader::getEndLineSize(size_t newline_index) {
+  gsl_Expects(buffer_.size() > newline_index);
+  if (buffer_[newline_index] != '\n') {
+    return 0;
+  }
+  if (newline_index == 0 || buffer_[newline_index - 1] != '\r') {
+    return 1;
+  }
+  return 2;
+}
+
+void LineReader::setLastLineInfoAttributes(uint8_t endline_size, const 
std::optional<std::string>& starts_with) {
+  const uint64_t size_from_beginning_of_stream = (current_buffer_count_ - 1) * 
SPLIT_TEXT_BUFFER_SIZE + buffer_offset_;
+  if (last_line_info_) {
+    LineInfo previous_line_info = *last_line_info_;
+    last_line_info_->offset = previous_line_info.offset + 
previous_line_info.size;
+    last_line_info_->size = size_from_beginning_of_stream - 
previous_line_info.offset - previous_line_info.size;
+    last_line_info_->endline_size = endline_size;
+    last_line_info_->matches_starts_with = true;
+  } else {
+    last_line_info_ = LineInfo{.offset = 0, .size = read_size_ - 
last_read_size_ + buffer_offset_, .endline_size = endline_size, 
.matches_starts_with = true};
+  }
+
+  if (starts_with) {
+    last_line_info_->matches_starts_with = last_line_info_->size >= 
starts_with->size() &&
+      std::equal(starts_with->begin(), starts_with->end(), buffer_.begin() + 
last_line_info_->offset, buffer_.begin() + last_line_info_->offset + 
starts_with->size());
+  }
+}
+
+bool LineReader::readNextBuffer() {
+  buffer_offset_ = 0;
+  last_read_size_ = (std::min)(gsl::narrow<size_t>(stream_->size() - 
read_size_), SPLIT_TEXT_BUFFER_SIZE);
+  const auto read_ret = 
stream_->read(as_writable_bytes(std::span(buffer_).subspan(0, 
last_read_size_)));
+  read_size_ += read_ret;
+  if (io::isError(read_ret)) {
+    state_ = StreamReadState::StreamReadError;
+    return false;
+  }
+  ++current_buffer_count_;
+  return true;
+}
+
+std::optional<LineReader::LineInfo> LineReader::finalizeLineInfo(uint8_t 
endline_size, const std::optional<std::string>& starts_with) {
+  setLastLineInfoAttributes(endline_size, starts_with);
+  if (last_line_info_->size == 0) {
+    return std::nullopt;
+  }
+  return last_line_info_;
+}
+
+std::optional<LineReader::LineInfo> LineReader::readNextLine(const 
std::optional<std::string>& starts_with) {
+  if (state_ != StreamReadState::Ok) {
+    return std::nullopt;
+  }
+
+  const auto isLastReadProcessed = [this]() { return last_read_size_ <= 
buffer_offset_; };
+  while (read_size_ < stream_->size() || !isLastReadProcessed()) {
+    if (isLastReadProcessed() && !readNextBuffer()) {
+      return std::nullopt;
+    }
+
+    for (auto i = buffer_offset_; i < last_read_size_; ++i) {
+      if (buffer_[i] == '\n') {
+        buffer_offset_ = i + 1;
+        return finalizeLineInfo(getEndLineSize(i), starts_with);
+      }
+    }
+    buffer_offset_ = last_read_size_;
+  }
+
+  state_ = StreamReadState::EndOfStream;
+  return finalizeLineInfo(0, starts_with);
+}
+
+SplitTextFragmentGenerator::SplitTextFragmentGenerator(const 
std::shared_ptr<io::InputStream>& stream, const SplitTextConfiguration& 
split_text_config)
+    : line_reader_(stream),
+      split_text_config_(split_text_config) {
+}
+
+void SplitTextFragmentGenerator::finalizeFragmentOffset(Fragment& 
current_fragment) {
+  current_fragment.fragment_offset = flow_file_offset_;
+  flow_file_offset_ += current_fragment.fragment_size;
+}
+
+void SplitTextFragmentGenerator::addLineToFragment(Fragment& current_fragment, 
const LineReader::LineInfo& line) {
+  if (line.endline_size == line.size) {  // if line consists only of endline 
characters, we need to append the fragment trim size
+    current_fragment.endline_size += line.endline_size;
+  } else {
+    current_fragment.endline_size = line.endline_size;
+  }
+  current_fragment.text_line_count += line.endline_size == line.size ? 0 : 1;
+  current_fragment.fragment_size += line.size;
+}
+
+bool SplitTextFragmentGenerator::lineSizeWouldExceedMaxFragmentSize(const 
LineReader::LineInfo& line, uint64_t fragment_size) const {
+  return split_text_config_.maximum_fragment_size && fragment_size + line.size 
+ header_fragment_size_ > split_text_config_.maximum_fragment_size.value();
+}
+
+nonstd::expected<SplitTextFragmentGenerator::Fragment, std::string> 
SplitTextFragmentGenerator::createHeaderFragmentUsingLineCount() {
+  Fragment header_fragment;
+  for (uint64_t i = 0; i < split_text_config_.header_line_count; ++i) {
+    auto line = line_reader_.readNextLine();
+    if (!line) {
+      if (getState() == StreamReadState::EndOfStream) {
+        return nonstd::make_unexpected("The flow file's line count is less 
than the specified header line count!");
+      } else {
+        return nonstd::make_unexpected("Error while reading flow file 
stream!");
+      }
+    }
+    if (lineSizeWouldExceedMaxFragmentSize(*line, 
header_fragment.fragment_size)) {
+      return nonstd::make_unexpected("Header line would exceed the maximum 
fragment size!");
+    }
+
+    addLineToFragment(header_fragment, *line);
+  }
+
+  flow_file_offset_ += header_fragment.fragment_size;
+  header_fragment_size_ = header_fragment.fragment_size;
+  return header_fragment;
+}
+
+nonstd::expected<SplitTextFragmentGenerator::Fragment, std::string> 
SplitTextFragmentGenerator::createHeaderFragmentUsingHeaderMarkerCharacters() {
+  Fragment header_fragment;
+  while (auto line = 
line_reader_.readNextLine(split_text_config_.header_line_marker_characters)) {
+    if (line->size < split_text_config_.header_line_marker_characters->size() 
|| !line->matches_starts_with) {
+      buffered_line_info_ = line;
+      break;
+    }
+    if (lineSizeWouldExceedMaxFragmentSize(*line, 
header_fragment.fragment_size)) {
+      return nonstd::make_unexpected("Header line would exceed the maximum 
fragment size!");
+    }
+
+    addLineToFragment(header_fragment, *line);
+  }
+
+  flow_file_offset_ += header_fragment.fragment_size;
+  header_fragment_size_ = header_fragment.fragment_size;
+  return header_fragment;
+}
+
+nonstd::expected<SplitTextFragmentGenerator::Fragment, std::string> 
SplitTextFragmentGenerator::readHeaderFragment() {
+  gsl_Expects(flow_file_offset_ == 0);
+  if (split_text_config_.header_line_count == 0 && 
!split_text_config_.header_line_marker_characters) {
+    return nonstd::make_unexpected("No header properties were set!");
+  }
+
+  if (split_text_config_.header_line_count > 0) {
+    return createHeaderFragmentUsingLineCount();
+  }
+
+  return createHeaderFragmentUsingHeaderMarkerCharacters();
+}
+
+std::optional<SplitTextFragmentGenerator::Fragment> 
SplitTextFragmentGenerator::readNextFragment() {
+  Fragment current_fragment;
+  while (auto line = buffered_line_info_ ? buffered_line_info_ : 
line_reader_.readNextLine()) {
+    buffered_line_info_.reset();
+    if (lineSizeWouldExceedMaxFragmentSize(*line, 
current_fragment.fragment_size)) {
+      if (current_fragment.processed_line_count == 0) {  // first fragment 
line would be bigger than maximum fragment size (we don't have any other line 
in the fragment yet)
+        addLineToFragment(current_fragment, *line);
+      } else {
+        buffered_line_info_ = line;
+      }
+
+      finalizeFragmentOffset(current_fragment);
+      return current_fragment;
+    }
+
+    ++current_fragment.processed_line_count;
+    addLineToFragment(current_fragment, *line);
+    if (split_text_config_.line_split_count == 
current_fragment.processed_line_count) {
+      finalizeFragmentOffset(current_fragment);
+      return current_fragment;
+    }
+  }
+
+  if (current_fragment.fragment_size > 0) {
+    finalizeFragmentOffset(current_fragment);
+    return current_fragment;
+  }
+  return std::nullopt;
+}
+
+}  // namespace detail
+
+void SplitText::initialize() {
+  setSupportedProperties(Properties);
+  setSupportedRelationships(Relationships);
+}
+
+void SplitText::onSchedule(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSessionFactory>& 
/*sessionFactory*/) {
+  gsl_Expects(context);
+  split_text_config_.line_split_count = 
utils::getRequiredPropertyOrThrow<uint64_t>(*context, LineSplitCount.name);
+  logger_->log_debug("SplitText line split count: {}", 
split_text_config_.line_split_count);
+  auto max_fragment_data_size_value = 
context->getProperty<core::DataSizeValue>(MaximumFragmentSize);
+  if (max_fragment_data_size_value) {
+    split_text_config_.maximum_fragment_size = 
max_fragment_data_size_value->getValue();
+    logger_->log_debug("SplitText maximum fragment size: {}", 
split_text_config_.maximum_fragment_size.value());
+  }
+  if (split_text_config_.maximum_fragment_size && 
split_text_config_.maximum_fragment_size.value() == 0) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Maximum Fragment Size cannot 
be 0!");
+  }
+  if (split_text_config_.line_split_count == 0 && 
!split_text_config_.maximum_fragment_size) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Line Split Count is set to 0, 
but Maximum Fragment Size is not set!");
+  }
+  split_text_config_.header_line_count = 
utils::getRequiredPropertyOrThrow<uint64_t>(*context, HeaderLineCount.name);
+  logger_->log_debug("SplitText header line count: {}", 
split_text_config_.header_line_count);
+  split_text_config_.header_line_marker_characters = 
context->getProperty(HeaderLineMarkerCharacters);
+  if (split_text_config_.header_line_marker_characters && 
split_text_config_.header_line_marker_characters->size() >= 
detail::SPLIT_TEXT_BUFFER_SIZE) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("SplitText header 
line marker characters length is larger than the maximum allowed: {}", 
detail::SPLIT_TEXT_BUFFER_SIZE));
+  }
+  if (split_text_config_.header_line_marker_characters) {
+    logger_->log_debug("SplitText header line marker characters were set: {}", 
*split_text_config_.header_line_marker_characters);
+  }
+  split_text_config_.remove_trailing_new_lines = 
utils::getRequiredPropertyOrThrow<bool>(*context, RemoveTrailingNewlines.name);
+  logger_->log_debug("SplitText should remove trailing new lines: {}", 
split_text_config_.remove_trailing_new_lines ? "true" : "false");

Review Comment:
   Updated in fccf152353ae7c25a8d4944b8b0e3add15254984



##########
extensions/standard-processors/tests/unit/SplitTextTests.cpp:
##########
@@ -0,0 +1,860 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FlowFileRecord.h"
+#include "TestBase.h"
+#include "Catch.h"
+#include "processors/SplitText.h"
+#include "SingleProcessorTestController.h"
+#include "io/BufferStream.h"
+
+namespace org::apache::nifi::minifi::test {
+
+TEST_CASE("Test LineReader with nullptr") {
+  processors::detail::LineReader reader{nullptr};
+  CHECK(reader.readNextLine() == std::nullopt);
+  CHECK(reader.getState() == processors::detail::StreamReadState::EndOfStream);
+}
+
+TEST_CASE("Test LineReader with empty stream") {
+  auto stream = std::make_shared<io::BufferStream>();
+  processors::detail::LineReader reader{nullptr};
+  CHECK(reader.readNextLine() == std::nullopt);
+  CHECK(reader.getState() == processors::detail::StreamReadState::EndOfStream);
+}
+
+TEST_CASE("Test LineReader with trailing endline") {
+  auto stream = std::make_shared<io::BufferStream>();
+  std::string input = "this is a new line\nand another line\r\nthirdline\n";
+  stream->write(reinterpret_cast<const uint8_t*>(input.data()), input.size());
+  processors::detail::LineReader reader{stream};
+  CHECK(*reader.readNextLine() == 
processors::detail::LineReader::LineInfo{.offset = 0, .size = 19, .endline_size 
= 1});
+  CHECK(*reader.readNextLine() == 
processors::detail::LineReader::LineInfo{.offset = 19, .size = 18, 
.endline_size = 2});
+  CHECK(*reader.readNextLine() == 
processors::detail::LineReader::LineInfo{.offset = 37, .size = 10, 
.endline_size = 1});
+  CHECK(reader.readNextLine() == std::nullopt);
+  CHECK(reader.getState() == processors::detail::StreamReadState::EndOfStream);
+}
+
+TEST_CASE("Test LineReader without trailing endlines") {
+  auto stream = std::make_shared<io::BufferStream>();
+  std::string input = "this is a new line\nand another line\r\nthirdline";
+  stream->write(reinterpret_cast<const uint8_t*>(input.data()), input.size());
+  processors::detail::LineReader reader{stream};
+  CHECK(*reader.readNextLine() == 
processors::detail::LineReader::LineInfo{.offset = 0, .size = 19, .endline_size 
= 1});
+  CHECK(*reader.readNextLine() == 
processors::detail::LineReader::LineInfo{.offset = 19, .size = 18, 
.endline_size = 2});
+  CHECK(*reader.readNextLine() == 
processors::detail::LineReader::LineInfo{.offset = 37, .size = 9, .endline_size 
= 0});
+  CHECK(reader.readNextLine() == std::nullopt);
+  CHECK(reader.getState() == processors::detail::StreamReadState::EndOfStream);
+}
+
+TEST_CASE("Test LineReader with input larger than buffer length") {
+  auto stream = std::make_shared<io::BufferStream>();
+  const auto first_line_size = 
static_cast<size_t>(processors::detail::SPLIT_TEXT_BUFFER_SIZE * 1.5);
+  const auto second_line_size = 
static_cast<size_t>(processors::detail::SPLIT_TEXT_BUFFER_SIZE * 1.7);
+  std::string input = std::string(first_line_size, 'a') + "\n" + 
std::string(second_line_size, 'b') + "\n";
+  stream->write(reinterpret_cast<const uint8_t*>(input.data()), input.size());
+  processors::detail::LineReader reader{stream};
+  CHECK(*reader.readNextLine() == 
processors::detail::LineReader::LineInfo{.offset = 0, .size = first_line_size + 
1, .endline_size = 1});
+  CHECK(*reader.readNextLine() == 
processors::detail::LineReader::LineInfo{.offset = first_line_size +1 , .size = 
second_line_size + 1, .endline_size = 1});
+  CHECK(reader.readNextLine() == std::nullopt);
+  CHECK(reader.getState() == processors::detail::StreamReadState::EndOfStream);
+}
+
+TEST_CASE("Test LineReader with input of same size as buffer length") {
+  auto stream = std::make_shared<io::BufferStream>();
+  std::string input = std::string(processors::detail::SPLIT_TEXT_BUFFER_SIZE - 
1, 'a') + "\n" + std::string(processors::detail::SPLIT_TEXT_BUFFER_SIZE * 2 - 
1, 'b') + "\n";
+  stream->write(reinterpret_cast<const uint8_t*>(input.data()), input.size());
+  processors::detail::LineReader reader{stream};
+  CHECK(*reader.readNextLine() == 
processors::detail::LineReader::LineInfo{.offset = 0, .size = 
processors::detail::SPLIT_TEXT_BUFFER_SIZE, .endline_size = 1});
+  CHECK(*reader.readNextLine() ==
+    processors::detail::LineReader::LineInfo{.offset = 
processors::detail::SPLIT_TEXT_BUFFER_SIZE, .size = 
processors::detail::SPLIT_TEXT_BUFFER_SIZE * 2, .endline_size = 1});
+  CHECK(reader.readNextLine() == std::nullopt);
+  CHECK(reader.getState() == processors::detail::StreamReadState::EndOfStream);
+}
+
+TEST_CASE("Test LineReader with input larger than buffer length without 
trailing endline") {
+  auto stream = std::make_shared<io::BufferStream>();
+  const auto first_line_size = 
static_cast<size_t>(processors::detail::SPLIT_TEXT_BUFFER_SIZE * 1.5);
+  const auto second_line_size = 
static_cast<size_t>(processors::detail::SPLIT_TEXT_BUFFER_SIZE * 1.7);
+  std::string input = std::string(first_line_size, 'a') + "\n" + 
std::string(second_line_size, 'b');
+  stream->write(reinterpret_cast<const uint8_t*>(input.data()), input.size());
+  processors::detail::LineReader reader{stream};
+  CHECK(*reader.readNextLine() == 
processors::detail::LineReader::LineInfo{.offset = 0, .size = first_line_size + 
1, .endline_size = 1});
+  CHECK(*reader.readNextLine() == 
processors::detail::LineReader::LineInfo{.offset = first_line_size + 1, .size = 
second_line_size, .endline_size = 0});
+  CHECK(reader.readNextLine() == std::nullopt);
+  CHECK(reader.getState() == processors::detail::StreamReadState::EndOfStream);
+}
+
+TEST_CASE("Test LineReader with input of same size as buffer length without 
trailing endline") {
+  auto stream = std::make_shared<io::BufferStream>();
+  std::string input = std::string(processors::detail::SPLIT_TEXT_BUFFER_SIZE - 
1, 'a') + "\n" + std::string(processors::detail::SPLIT_TEXT_BUFFER_SIZE * 2, 
'b');
+  stream->write(reinterpret_cast<const uint8_t*>(input.data()), input.size());
+  processors::detail::LineReader reader{stream};
+  CHECK(*reader.readNextLine() == 
processors::detail::LineReader::LineInfo{.offset = 0, .size = 
processors::detail::SPLIT_TEXT_BUFFER_SIZE, .endline_size = 1});
+  CHECK(*reader.readNextLine() ==
+    processors::detail::LineReader::LineInfo{.offset = 
processors::detail::SPLIT_TEXT_BUFFER_SIZE, .size = 
processors::detail::SPLIT_TEXT_BUFFER_SIZE * 2, .endline_size = 0});
+  CHECK(reader.readNextLine() == std::nullopt);
+  CHECK(reader.getState() == processors::detail::StreamReadState::EndOfStream);
+}
+
+TEST_CASE("Test LineReader with starts with filter") {
+  auto stream = std::make_shared<io::BufferStream>();
+  std::string input = "header this is a new line\nheader and another 
line\r\nthirdline\nheader line\n";
+  stream->write(reinterpret_cast<const uint8_t*>(input.data()), input.size());
+  processors::detail::LineReader reader{stream};
+  CHECK(*reader.readNextLine("header") == 
processors::detail::LineReader::LineInfo{.offset = 0, .size = 26, .endline_size 
= 1, .matches_starts_with = true});
+  CHECK(*reader.readNextLine("header") == 
processors::detail::LineReader::LineInfo{.offset = 26, .size = 25, 
.endline_size = 2, .matches_starts_with = true});
+  CHECK(*reader.readNextLine("header") == 
processors::detail::LineReader::LineInfo{.offset = 51, .size = 10, 
.endline_size = 1, .matches_starts_with = false});
+  CHECK(*reader.readNextLine("header") == 
processors::detail::LineReader::LineInfo{.offset = 61, .size = 12, 
.endline_size = 1, .matches_starts_with = true});
+  CHECK(reader.readNextLine() == std::nullopt);
+  CHECK(reader.getState() == processors::detail::StreamReadState::EndOfStream);
+}
+
+struct ExpectedSplitTextResult {
+  std::string content;
+  uint64_t fragment_index = 0;
+  uint64_t fragment_count = 0;
+  uint64_t text_line_count = 0;
+};
+
+struct SplitTextProperties {
+  uint64_t line_split_count = 0;
+  std::optional<bool> trim_trailing_newlines;
+  std::optional<uint64_t> maximum_fragment_size;
+  std::optional<uint64_t> header_line_count;
+  std::optional<std::string> header_line_marker_characters;
+};
+
+void verifySplitResults(const SingleProcessorTestController& controller, const 
ProcessorTriggerResult& trigger_results, const 
std::vector<ExpectedSplitTextResult>& expected_results) {
+  REQUIRE(trigger_results.at(processors::SplitText::Splits).size() == 
expected_results.size());
+  std::string identifier;
+  for (size_t i = 0; i < expected_results.size(); ++i) {
+    
CHECK(controller.plan->getContent(trigger_results.at(processors::SplitText::Splits)[i])
 == expected_results[i].content);
+    
CHECK(trigger_results.at(processors::SplitText::Splits)[i]->getAttribute(processors::SplitText::TextLineCountOutputAttribute.name)
 == std::to_string(expected_results[i].text_line_count));
+    
CHECK(trigger_results.at(processors::SplitText::Splits)[i]->getAttribute(processors::SplitText::FragmentSizeOutputAttribute.name)
 == std::to_string(expected_results[i].content.size()));
+    if (i > 0) {
+      
CHECK(trigger_results.at(processors::SplitText::Splits)[i]->getAttribute(processors::SplitText::FragmentIdentifierOutputAttribute.name).value()
 == identifier);
+    } else {
+      identifier = 
trigger_results.at(processors::SplitText::Splits)[i]->getAttribute(processors::SplitText::FragmentIdentifierOutputAttribute.name).value();
+      CHECK(!identifier.empty());
+    }
+    
CHECK(trigger_results.at(processors::SplitText::Splits)[i]->getAttribute(core::SpecialFlowAttribute::FILENAME)
 ==
+      "a.foo.fragment." + identifier + "." + 
std::to_string(expected_results[i].fragment_index));
+    
CHECK(trigger_results.at(processors::SplitText::Splits)[i]->getAttribute(processors::SplitText::FragmentIndexOutputAttribute.name)
 == std::to_string(expected_results[i].fragment_index));
+    
CHECK(trigger_results.at(processors::SplitText::Splits)[i]->getAttribute(processors::SplitText::FragmentCountOutputAttribute.name)
 == std::to_string(expected_results[i].fragment_count));
+    
CHECK(trigger_results.at(processors::SplitText::Splits)[i]->getAttribute(processors::SplitText::SegmentOriginalFilenameOutputAttribute.name)
 == "a.foo");
+  }
+}
+
+void runSplitTextTest(const std::string& input, const 
std::vector<ExpectedSplitTextResult>& expected_results, const 
SplitTextProperties& properties) {
+  const auto split_text = std::make_shared<processors::SplitText>("SplitText");
+  SingleProcessorTestController controller{split_text};
+  split_text->setProperty(processors::SplitText::LineSplitCount, 
std::to_string(properties.line_split_count));
+  if (properties.maximum_fragment_size) {
+    split_text->setProperty(processors::SplitText::MaximumFragmentSize, 
std::to_string(*properties.maximum_fragment_size) + " B");
+  }
+  if (properties.trim_trailing_newlines) {
+    split_text->setProperty(processors::SplitText::RemoveTrailingNewlines, 
properties.trim_trailing_newlines.value() ? "true" : "false");
+  }
+  if (properties.header_line_count) {
+    split_text->setProperty(processors::SplitText::HeaderLineCount, 
std::to_string(*properties.header_line_count));
+  }
+  if (properties.header_line_marker_characters) {
+    split_text->setProperty(processors::SplitText::HeaderLineMarkerCharacters, 
*properties.header_line_marker_characters);
+  }
+  const auto trigger_results = controller.trigger(input, 
{{std::string(core::SpecialFlowAttribute::FILENAME), "a.foo"}});
+  CHECK(trigger_results.at(processors::SplitText::Failure).empty());
+  CHECK(trigger_results.at(processors::SplitText::Original).size() == 1);
+  
CHECK(trigger_results.at(processors::SplitText::Original)[0]->getAttribute(core::SpecialFlowAttribute::FILENAME)
 == "a.foo");
+  
CHECK(controller.plan->getContent(trigger_results.at(processors::SplitText::Original)[0])
 == input);
+  verifySplitResults(controller, trigger_results, expected_results);
+}
+
+TEST_CASE("Line Split Count property is required") {
+  const auto split_text = std::make_shared<processors::SplitText>("SplitText");
+  SingleProcessorTestController controller{split_text};
+  REQUIRE_THROWS_AS(controller.trigger("", {}), minifi::Exception);
+}
+
+TEST_CASE("Line Split Count property can only be 0 if Maximum Fragment Size is 
set") {
+  const auto split_text = std::make_shared<processors::SplitText>("SplitText");
+  SingleProcessorTestController controller{split_text};
+  split_text->setProperty(processors::SplitText::LineSplitCount, "0");
+  REQUIRE_THROWS_AS(controller.trigger("", {}), minifi::Exception);
+}
+
+TEST_CASE("Maximum Fragment Size cannot be set to 0") {
+  const auto split_text = std::make_shared<processors::SplitText>("SplitText");
+  SingleProcessorTestController controller{split_text};
+  split_text->setProperty(processors::SplitText::LineSplitCount, "0");
+  split_text->setProperty(processors::SplitText::MaximumFragmentSize, "0 B");
+  REQUIRE_THROWS_AS(controller.trigger("", {}), minifi::Exception);
+}
+
+TEST_CASE("Header Line Marker Characters size cannot be equal or larger than 
split text buffer size") {
+  const auto split_text = std::make_shared<processors::SplitText>("SplitText");
+  SingleProcessorTestController controller{split_text};
+  split_text->setProperty(processors::SplitText::LineSplitCount, "1");
+  std::string 
header_marker_character(static_cast<size_t>(processors::detail::SPLIT_TEXT_BUFFER_SIZE),
 'A');
+  split_text->setProperty(processors::SplitText::HeaderLineMarkerCharacters, 
header_marker_character);
+  REQUIRE_THROWS_AS(controller.trigger("", {}), minifi::Exception);
+}
+
+
+TEST_CASE("SplitText only forwards empty flowfile") {
+  const auto split_text = std::make_shared<processors::SplitText>("SplitText");
+  SingleProcessorTestController controller{split_text};
+  split_text->setProperty(processors::SplitText::LineSplitCount, "1");
+  const auto trigger_results = controller.trigger("", 
{{std::string(core::SpecialFlowAttribute::FILENAME), "a.foo"}});
+  CHECK(trigger_results.at(processors::SplitText::Splits).empty());
+  CHECK(trigger_results.at(processors::SplitText::Failure).empty());
+  CHECK(trigger_results.at(processors::SplitText::Original).size() == 1);
+  
CHECK(trigger_results.at(processors::SplitText::Original)[0]->getAttribute(core::SpecialFlowAttribute::FILENAME)
 == "a.foo");
+  
CHECK(controller.plan->getContent(trigger_results.at(processors::SplitText::Original)[0]).empty());
+}
+
+TEST_CASE("SplitText creates new flow file for a single line") {
+  std::vector<ExpectedSplitTextResult> expected_results(1, 
ExpectedSplitTextResult{});
+  expected_results[0].fragment_index = 1;
+  expected_results[0].fragment_count = 1;
+  expected_results[0].text_line_count = 1;
+  std::string line;
+  SECTION("Empty line with LF endline") {
+    line = "\n";
+    expected_results[0].content = line;
+    expected_results[0].text_line_count = 0;
+  }
+  SECTION("LF endline") {
+    line = "this is a new line\n";
+    expected_results[0].content = line;
+  }
+  SECTION("CRLF endline") {
+    line = "this is a new line\r\n";
+    expected_results[0].content = line;
+  }
+  SECTION("Line size larger than buffer size") {
+    line = 
std::string(static_cast<size_t>(processors::detail::SPLIT_TEXT_BUFFER_SIZE * 
1.5), 'a') + "\n";
+    expected_results[0].content = line;
+  }
+  SECTION("Content without endline is a single line") {
+    line = "this is a new line";
+    expected_results[0].content = line;
+  }
+
+  SplitTextProperties properties;
+  properties.line_split_count = 1;
+  properties.trim_trailing_newlines = false;
+  runSplitTextTest(line, expected_results, properties);
+}
+
+TEST_CASE("SplitText creates new flow file with 2 lines") {
+  std::vector<ExpectedSplitTextResult> expected_results(1, 
ExpectedSplitTextResult{});
+  expected_results[0].fragment_index = 1;
+  expected_results[0].fragment_count = 1;
+  expected_results[0].text_line_count = 2;
+  std::string input;
+  bool remove_trailing_endline = false;
+  SECTION("Only LF endlines") {
+    input = "\n\n";
+    expected_results[0].text_line_count = 0;
+    expected_results[0].content = input;
+  }
+  SECTION("LF endline") {
+    input = "this is a new line\nand another line\n";
+    expected_results[0].content = input;
+  }
+  SECTION("LF endline removing trailing endlines") {
+    input = "this is a new line\nand another line\n\n";
+    remove_trailing_endline = true;
+    expected_results[0].content = "this is a new line\nand another line";
+  }
+  SECTION("CRLF endline") {
+    input = "this is a new line\r\nand another line\r\n";
+    expected_results[0].content = input;
+  }
+  SECTION("CRLF endline removing trailing endlines") {
+    input = "this is a new line\r\nand another line\r\n\r\n";
+    remove_trailing_endline = true;
+    expected_results[0].content = "this is a new line\r\nand another line";
+  }
+  SECTION("Line size larger than buffer size") {
+    std::string 
str(static_cast<size_t>(processors::detail::SPLIT_TEXT_BUFFER_SIZE * 1.5), 'a');
+    input = str + "\n" + str + "\n";
+    expected_results[0].content = input;
+  }
+  SECTION("Line size larger than buffer size without endline at the end") {
+    std::string 
str(static_cast<size_t>(processors::detail::SPLIT_TEXT_BUFFER_SIZE * 1.5), 'a');
+    input = str + "\n" + str;
+    expected_results[0].content = input;
+  }
+
+  SplitTextProperties properties;
+  properties.line_split_count = 2;
+  properties.trim_trailing_newlines = remove_trailing_endline;
+  runSplitTextTest(input, expected_results, properties);
+}
+
+TEST_CASE("SplitText creates seperate flow files from 2 lines") {

Review Comment:
   Good catch, updated in fccf152353ae7c25a8d4944b8b0e3add15254984



##########
extensions/standard-processors/processors/SplitText.cpp:
##########
@@ -0,0 +1,383 @@
+/**
+ * @file SplitText.cpp
+ * SplitText class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "SplitText.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/Resource.h"
+#include "core/FlowFile.h"
+#include "utils/gsl.h"
+#include "utils/ProcessorConfigUtils.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+namespace detail {
+
+LineReader::LineReader(const std::shared_ptr<io::InputStream>& stream)
+    : stream_(stream) {
+  if (!stream_ || stream_->size() == 0) {
+    state_ = StreamReadState::EndOfStream;
+  }
+}
+
+uint8_t LineReader::getEndLineSize(size_t newline_index) {
+  gsl_Expects(buffer_.size() > newline_index);
+  if (buffer_[newline_index] != '\n') {
+    return 0;
+  }
+  if (newline_index == 0 || buffer_[newline_index - 1] != '\r') {
+    return 1;
+  }
+  return 2;
+}
+
+void LineReader::setLastLineInfoAttributes(uint8_t endline_size, const 
std::optional<std::string>& starts_with) {
+  const uint64_t size_from_beginning_of_stream = (current_buffer_count_ - 1) * 
SPLIT_TEXT_BUFFER_SIZE + buffer_offset_;
+  if (last_line_info_) {
+    LineInfo previous_line_info = *last_line_info_;
+    last_line_info_->offset = previous_line_info.offset + 
previous_line_info.size;
+    last_line_info_->size = size_from_beginning_of_stream - 
previous_line_info.offset - previous_line_info.size;
+    last_line_info_->endline_size = endline_size;
+    last_line_info_->matches_starts_with = true;
+  } else {
+    last_line_info_ = LineInfo{.offset = 0, .size = read_size_ - 
last_read_size_ + buffer_offset_, .endline_size = endline_size, 
.matches_starts_with = true};
+  }
+
+  if (starts_with) {
+    last_line_info_->matches_starts_with = last_line_info_->size >= 
starts_with->size() &&
+      std::equal(starts_with->begin(), starts_with->end(), buffer_.begin() + 
last_line_info_->offset, buffer_.begin() + last_line_info_->offset + 
starts_with->size());
+  }
+}
+
+bool LineReader::readNextBuffer() {
+  buffer_offset_ = 0;
+  last_read_size_ = (std::min)(gsl::narrow<size_t>(stream_->size() - 
read_size_), SPLIT_TEXT_BUFFER_SIZE);
+  const auto read_ret = 
stream_->read(as_writable_bytes(std::span(buffer_).subspan(0, 
last_read_size_)));
+  read_size_ += read_ret;
+  if (io::isError(read_ret)) {
+    state_ = StreamReadState::StreamReadError;
+    return false;
+  }
+  ++current_buffer_count_;
+  return true;
+}
+
+std::optional<LineReader::LineInfo> LineReader::finalizeLineInfo(uint8_t 
endline_size, const std::optional<std::string>& starts_with) {
+  setLastLineInfoAttributes(endline_size, starts_with);
+  if (last_line_info_->size == 0) {
+    return std::nullopt;
+  }
+  return last_line_info_;
+}
+
+std::optional<LineReader::LineInfo> LineReader::readNextLine(const 
std::optional<std::string>& starts_with) {
+  if (state_ != StreamReadState::Ok) {
+    return std::nullopt;
+  }
+
+  const auto isLastReadProcessed = [this]() { return last_read_size_ <= 
buffer_offset_; };
+  while (read_size_ < stream_->size() || !isLastReadProcessed()) {
+    if (isLastReadProcessed() && !readNextBuffer()) {
+      return std::nullopt;
+    }
+
+    for (auto i = buffer_offset_; i < last_read_size_; ++i) {
+      if (buffer_[i] == '\n') {
+        buffer_offset_ = i + 1;
+        return finalizeLineInfo(getEndLineSize(i), starts_with);
+      }
+    }
+    buffer_offset_ = last_read_size_;
+  }
+
+  state_ = StreamReadState::EndOfStream;
+  return finalizeLineInfo(0, starts_with);
+}
+
+SplitTextFragmentGenerator::SplitTextFragmentGenerator(const 
std::shared_ptr<io::InputStream>& stream, const SplitTextConfiguration& 
split_text_config)
+    : line_reader_(stream),
+      split_text_config_(split_text_config) {
+}
+
+void SplitTextFragmentGenerator::finalizeFragmentOffset(Fragment& 
current_fragment) {
+  current_fragment.fragment_offset = flow_file_offset_;
+  flow_file_offset_ += current_fragment.fragment_size;
+}
+
+void SplitTextFragmentGenerator::addLineToFragment(Fragment& current_fragment, 
const LineReader::LineInfo& line) {
+  if (line.endline_size == line.size) {  // if line consists only of endline 
characters, we need to append the fragment trim size
+    current_fragment.endline_size += line.endline_size;
+  } else {
+    current_fragment.endline_size = line.endline_size;
+  }
+  current_fragment.text_line_count += line.endline_size == line.size ? 0 : 1;
+  current_fragment.fragment_size += line.size;
+}
+
+bool SplitTextFragmentGenerator::lineSizeWouldExceedMaxFragmentSize(const 
LineReader::LineInfo& line, uint64_t fragment_size) const {
+  return split_text_config_.maximum_fragment_size && fragment_size + line.size 
+ header_fragment_size_ > split_text_config_.maximum_fragment_size.value();
+}
+
+nonstd::expected<SplitTextFragmentGenerator::Fragment, std::string> 
SplitTextFragmentGenerator::createHeaderFragmentUsingLineCount() {
+  Fragment header_fragment;
+  for (uint64_t i = 0; i < split_text_config_.header_line_count; ++i) {
+    auto line = line_reader_.readNextLine();
+    if (!line) {
+      if (getState() == StreamReadState::EndOfStream) {
+        return nonstd::make_unexpected("The flow file's line count is less 
than the specified header line count!");
+      } else {
+        return nonstd::make_unexpected("Error while reading flow file 
stream!");
+      }
+    }
+    if (lineSizeWouldExceedMaxFragmentSize(*line, 
header_fragment.fragment_size)) {
+      return nonstd::make_unexpected("Header line would exceed the maximum 
fragment size!");
+    }
+
+    addLineToFragment(header_fragment, *line);
+  }
+
+  flow_file_offset_ += header_fragment.fragment_size;
+  header_fragment_size_ = header_fragment.fragment_size;
+  return header_fragment;
+}
+
+nonstd::expected<SplitTextFragmentGenerator::Fragment, std::string> 
SplitTextFragmentGenerator::createHeaderFragmentUsingHeaderMarkerCharacters() {
+  Fragment header_fragment;
+  while (auto line = 
line_reader_.readNextLine(split_text_config_.header_line_marker_characters)) {
+    if (line->size < split_text_config_.header_line_marker_characters->size() 
|| !line->matches_starts_with) {
+      buffered_line_info_ = line;
+      break;
+    }
+    if (lineSizeWouldExceedMaxFragmentSize(*line, 
header_fragment.fragment_size)) {
+      return nonstd::make_unexpected("Header line would exceed the maximum 
fragment size!");
+    }
+
+    addLineToFragment(header_fragment, *line);
+  }
+
+  flow_file_offset_ += header_fragment.fragment_size;
+  header_fragment_size_ = header_fragment.fragment_size;
+  return header_fragment;
+}
+
+nonstd::expected<SplitTextFragmentGenerator::Fragment, std::string> 
SplitTextFragmentGenerator::readHeaderFragment() {
+  gsl_Expects(flow_file_offset_ == 0);
+  if (split_text_config_.header_line_count == 0 && 
!split_text_config_.header_line_marker_characters) {
+    return nonstd::make_unexpected("No header properties were set!");
+  }
+
+  if (split_text_config_.header_line_count > 0) {
+    return createHeaderFragmentUsingLineCount();
+  }
+
+  return createHeaderFragmentUsingHeaderMarkerCharacters();
+}
+
+std::optional<SplitTextFragmentGenerator::Fragment> 
SplitTextFragmentGenerator::readNextFragment() {
+  Fragment current_fragment;
+  while (auto line = buffered_line_info_ ? buffered_line_info_ : 
line_reader_.readNextLine()) {
+    buffered_line_info_.reset();
+    if (lineSizeWouldExceedMaxFragmentSize(*line, 
current_fragment.fragment_size)) {
+      if (current_fragment.processed_line_count == 0) {  // first fragment 
line would be bigger than maximum fragment size (we don't have any other line 
in the fragment yet)
+        addLineToFragment(current_fragment, *line);
+      } else {
+        buffered_line_info_ = line;
+      }
+
+      finalizeFragmentOffset(current_fragment);
+      return current_fragment;
+    }
+
+    ++current_fragment.processed_line_count;
+    addLineToFragment(current_fragment, *line);
+    if (split_text_config_.line_split_count == 
current_fragment.processed_line_count) {
+      finalizeFragmentOffset(current_fragment);
+      return current_fragment;
+    }
+  }
+
+  if (current_fragment.fragment_size > 0) {
+    finalizeFragmentOffset(current_fragment);
+    return current_fragment;
+  }
+  return std::nullopt;
+}
+
+}  // namespace detail
+
+void SplitText::initialize() {
+  setSupportedProperties(Properties);
+  setSupportedRelationships(Relationships);
+}
+
+void SplitText::onSchedule(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSessionFactory>& 
/*sessionFactory*/) {
+  gsl_Expects(context);
+  split_text_config_.line_split_count = 
utils::getRequiredPropertyOrThrow<uint64_t>(*context, LineSplitCount.name);
+  logger_->log_debug("SplitText line split count: {}", 
split_text_config_.line_split_count);
+  auto max_fragment_data_size_value = 
context->getProperty<core::DataSizeValue>(MaximumFragmentSize);
+  if (max_fragment_data_size_value) {
+    split_text_config_.maximum_fragment_size = 
max_fragment_data_size_value->getValue();
+    logger_->log_debug("SplitText maximum fragment size: {}", 
split_text_config_.maximum_fragment_size.value());
+  }
+  if (split_text_config_.maximum_fragment_size && 
split_text_config_.maximum_fragment_size.value() == 0) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Maximum Fragment Size cannot 
be 0!");
+  }
+  if (split_text_config_.line_split_count == 0 && 
!split_text_config_.maximum_fragment_size) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Line Split Count is set to 0, 
but Maximum Fragment Size is not set!");
+  }
+  split_text_config_.header_line_count = 
utils::getRequiredPropertyOrThrow<uint64_t>(*context, HeaderLineCount.name);
+  logger_->log_debug("SplitText header line count: {}", 
split_text_config_.header_line_count);
+  split_text_config_.header_line_marker_characters = 
context->getProperty(HeaderLineMarkerCharacters);
+  if (split_text_config_.header_line_marker_characters && 
split_text_config_.header_line_marker_characters->size() >= 
detail::SPLIT_TEXT_BUFFER_SIZE) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("SplitText header 
line marker characters length is larger than the maximum allowed: {}", 
detail::SPLIT_TEXT_BUFFER_SIZE));
+  }
+  if (split_text_config_.header_line_marker_characters) {
+    logger_->log_debug("SplitText header line marker characters were set: {}", 
*split_text_config_.header_line_marker_characters);
+  }
+  split_text_config_.remove_trailing_new_lines = 
utils::getRequiredPropertyOrThrow<bool>(*context, RemoveTrailingNewlines.name);
+  logger_->log_debug("SplitText should remove trailing new lines: {}", 
split_text_config_.remove_trailing_new_lines ? "true" : "false");
+}
+
+void SplitText::onTrigger(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSession> &session) {
+  gsl_Expects(context && session);
+  std::shared_ptr<core::FlowFile> flow_file = session->get();
+  if (!flow_file) {
+    return;
+  }

Review Comment:
   You are right we should, although there are some inconsistencies regarding 
this in the codebase, for example RouteText does yield in this case, but 
ExtractText does not. I updated to include the yield call here in 
fccf152353ae7c25a8d4944b8b0e3add15254984, but we should review other processors 
as well later.



##########
PROCESSORS.md:
##########
@@ -2862,6 +2863,44 @@ In the list below, the names of required properties 
appear in bold. Any other pr
 | success | All files, containing log events, are routed to success |
 
 
+## SplitText
+
+### Description
+
+Splits a text file into multiple smaller text files on line boundaries limited 
by maximum number of lines or total size of fragment. Each output split file 
will contain no more than the configured number of lines or bytes. If both Line 
Split Count and Maximum Fragment Size are specified, the split occurs at 
whichever limit is reached first. If the first line of a fragment exceeds the 
Maximum Fragment Size, that line will be output in a single split file which 
exceeds the configured maximum size limit. This component also allows one to 
specify that each split should include a header lines. Header lines can be 
computed by either specifying the amount of lines that should constitute a 
header or by using header marker to match against the read lines. If such match 
happens then the corresponding line will be treated as header. Keep in mind 
that upon the first failure of header marker match, no more matches will be 
performed and the rest of the data will be parsed as regular lines for a g
 iven split. If after computation of the header there are no more data, the 
resulting split will consists of only header lines.
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other 
properties (not in bold) are considered optional. The table also indicates any 
default values, and whether a property supports the NiFi Expression Language.
+
+| Name                          | Default Value | Allowable Values | 
Description                                                                     
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
      |
+|-------------------------------|---------------|------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| **Line Split Count**          |               |                  | The 
number of lines that will be added to each split file, excluding header lines. 
A value of zero requires Maximum Fragment Size to be set, and line count will 
not be considered in determining splits.                                        
                                                                                
                                                                                
                                                                                
     |
+| Maximum Fragment Size         |               |                  | The 
maximum size of each split file, including header lines. NOTE: in the case 
where a single line exceeds this property (including headers, if applicable), 
that line will be output in a split of its own which exceeds this Maximum 
Fragment Size setting.                                                          
                                                                                
                                                                                
               |
+| **Header Line Count**         | 0             |                  | The 
number of lines that should be considered part of the header; the header lines 
will be duplicated to all split files.                                          
                                                                                
                                                                                
                                                                                
                                                                                
   |
+| Header Line Marker Characters |               |                  | The first 
character(s) on the line of the datafile which signifies a header line. This 
value is ignored when Header Line Count is non-zero. The first line not 
containing the Header Line Marker Characters and all subsequent lines are 
considered non-header                                                           
                                                                                
                                                                                
             |
+| **Remove Trailing Newlines**  | true          | true<br/>false   | Whether 
to remove newlines at the end of each split file. This should be false if you 
intend to merge the split files later. If this is set to 'true' and a FlowFile 
is generated that contains only 'empty lines' (i.e., consists only of and 
characters), the FlowFile will not be emitted. Note, however, that if header 
lines are specified, the resultant FlowFile will never be empty as it will 
consist of the header lines, so a FlowFile may be emitted that contains only 
the header lines. |

Review Comment:
   Good catch, updated in fccf152353ae7c25a8d4944b8b0e3add15254984



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to