This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit a49fe09c12efee4ca9c5db37dc8aca80dbf7eafc
Author: Daniel Bakai <[email protected]>
AuthorDate: Sun Jun 30 19:16:21 2019 +0200

    MINIFICPP-925 - Fix TailFile hang on long lines
    
    Signed-off-by: Arpad Boda <[email protected]>
    
    This closes #596
---
 .../processors/LogAttribute.cpp                    |  27 ++--
 .../standard-processors/processors/LogAttribute.h  |  32 ++---
 .../standard-processors/processors/TailFile.cpp    |   5 +-
 .../tests/integration/TailFileCronTest.cpp         |   2 +-
 .../tests/integration/TailFileTest.cpp             |   2 +-
 .../tests/unit/TailFileTests.cpp                   | 132 +++++++++++++++++-
 libminifi/CMakeLists.txt                           |   2 +-
 libminifi/include/core/Deprecated.h                |  29 ++++
 libminifi/include/core/ProcessSession.h            |   4 +-
 libminifi/src/core/ProcessSession.cpp              | 147 ++++++++++++---------
 10 files changed, 276 insertions(+), 106 deletions(-)

diff --git a/extensions/standard-processors/processors/LogAttribute.cpp 
b/extensions/standard-processors/processors/LogAttribute.cpp
index 6ffa397..cc44f25 100644
--- a/extensions/standard-processors/processors/LogAttribute.cpp
+++ b/extensions/standard-processors/processors/LogAttribute.cpp
@@ -135,35 +135,34 @@ void LogAttribute::onTrigger(const 
std::shared_ptr<core::ProcessContext> &contex
     }
     if (logPayload && flow->getSize() <= 1024 * 1024) {
       message << "\n" << "Payload:" << "\n";
-      ReadCallback callback(flow->getSize());
+      ReadCallback callback(logger_, flow->getSize());
       session->read(flow, &callback);
-      for (unsigned int i = 0, j = 0; i < callback.read_size_; i++) {
-        message << std::hex << callback.buffer_[i];
-        j++;
-        if (j == 80) {
-          message << '\n';
-          j = 0;
-        }
+
+      auto payload_hex = utils::StringUtils::to_hex(callback.buffer_.data(), 
callback.buffer_.size());
+      for (size_t i = 0; i < payload_hex.size(); i += 80) {
+        message << payload_hex.substr(i, 80) << '\n';
       }
+    } else {
+      message << "\n";
     }
-    message << "\n" << dashLine;
+    message << dashLine;
     std::string output = message.str();
 
     switch (level) {
       case LogAttrLevelInfo:
-        logger_->log_info("%s", output);
+        logging::LOG_INFO(logger_) << output;
         break;
       case LogAttrLevelDebug:
-        logger_->log_debug("%s", output);
+        logging::LOG_DEBUG(logger_) << output;
         break;
       case LogAttrLevelError:
-        logger_->log_error("%s", output);
+        logging::LOG_ERROR(logger_) << output;
         break;
       case LogAttrLevelTrace:
-        logger_->log_trace("%s", output);
+        logging::LOG_TRACE(logger_) << output;
         break;
       case LogAttrLevelWarn:
-        logger_->log_warn("%s", output);
+        logging::LOG_WARN(logger_) << output;
         break;
       default:
         break;
diff --git a/extensions/standard-processors/processors/LogAttribute.h 
b/extensions/standard-processors/processors/LogAttribute.h
index 5d03e11..a8f00eb 100644
--- a/extensions/standard-processors/processors/LogAttribute.h
+++ b/extensions/standard-processors/processors/LogAttribute.h
@@ -89,27 +89,23 @@ class LogAttribute : public core::Processor {
   // Nest Callback Class for read stream
   class ReadCallback : public InputStreamCallback {
    public:
-    ReadCallback(uint64_t size)
-        : read_size_(0) {
-      buffer_size_ = size;
-      buffer_ = new uint8_t[buffer_size_];
-    }
-    ~ReadCallback() {
-      if (buffer_)
-        delete[] buffer_;
+    ReadCallback(std::shared_ptr<logging::Logger> logger, uint64_t size)
+        : logger_(std::move(logger))
+        , buffer_(size)  {
     }
     int64_t process(std::shared_ptr<io::BaseStream> stream) {
-      int64_t ret = 0;
-      ret = stream->read(buffer_, buffer_size_);
-      if (!stream)
-        read_size_ = stream->getSize();
-      else
-        read_size_ = buffer_size_;
-      return ret;
+      if (buffer_.size() == 0U) {
+        return 0U;
+      }
+      int ret = stream->read(buffer_.data(), buffer_.size());
+      if (ret != buffer_.size()) {
+        logger_->log_error("%zu bytes were requested from the stream but %d 
bytes were read. Rolling back.", buffer_.size(), ret);
+        throw Exception(PROCESSOR_EXCEPTION, "Failed to read the entire 
FlowFile.");
+      }
+      return buffer_.size();
     }
-    uint8_t *buffer_;
-    uint64_t buffer_size_;
-    uint64_t read_size_;
+    std::shared_ptr<logging::Logger> logger_;
+    std::vector<uint8_t> buffer_;
   };
 
  public:
diff --git a/extensions/standard-processors/processors/TailFile.cpp 
b/extensions/standard-processors/processors/TailFile.cpp
index e6f1824..2fd0861 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -72,7 +72,8 @@ core::Property TailFile::StateFile("State File", "Specifies 
the file that should
                                    " what data has been ingested so that upon 
restart NiFi can resume from where it left off",
                                    "TailFileState");
 core::Property TailFile::Delimiter("Input Delimiter", "Specifies the character 
that should be used for delimiting the data being tailed"
-                                   "from the incoming file.",
+                                   "from the incoming file."
+                                   "If none is specified, data will be 
ingested as it becomes available.",
                                    "");
 
 core::Property TailFile::TailMode(
@@ -416,7 +417,7 @@ void TailFile::onTrigger(const 
std::shared_ptr<core::ProcessContext> &context, c
         }
         logger_->log_debug("Looking for delimiter 0x%X", delim);
         std::vector<std::shared_ptr<FlowFileRecord>> flowFiles;
-        session->import(fullPath, flowFiles, true, 
state.second.currentTailFilePosition_, delim);
+        session->import(fullPath, flowFiles, 
state.second.currentTailFilePosition_, delim);
         logger_->log_info("%u flowfiles were received from TailFile input", 
flowFiles.size());
 
         for (auto ffr : flowFiles) {
diff --git 
a/extensions/standard-processors/tests/integration/TailFileCronTest.cpp 
b/extensions/standard-processors/tests/integration/TailFileCronTest.cpp
index 2b5ee77..589c92c 100644
--- a/extensions/standard-processors/tests/integration/TailFileCronTest.cpp
+++ b/extensions/standard-processors/tests/integration/TailFileCronTest.cpp
@@ -73,7 +73,7 @@ class TailFileTestHarness : public IntegrationBase {
   virtual void runAssertions() {
     assert(LogTestController::getInstance().contains("5 flowfiles were 
received from TailFile input") == true);
     assert(LogTestController::getInstance().contains("Looking for delimiter 
0xA") == true);
-    assert(LogTestController::getInstance().contains("li\\ne5") == true);
+    
assert(LogTestController::getInstance().contains(utils::StringUtils::to_hex("li\\ne5"))
 == true);
   }
 
  protected:
diff --git a/extensions/standard-processors/tests/integration/TailFileTest.cpp 
b/extensions/standard-processors/tests/integration/TailFileTest.cpp
index 2b5ee77..589c92c 100644
--- a/extensions/standard-processors/tests/integration/TailFileTest.cpp
+++ b/extensions/standard-processors/tests/integration/TailFileTest.cpp
@@ -73,7 +73,7 @@ class TailFileTestHarness : public IntegrationBase {
   virtual void runAssertions() {
     assert(LogTestController::getInstance().contains("5 flowfiles were 
received from TailFile input") == true);
     assert(LogTestController::getInstance().contains("Looking for delimiter 
0xA") == true);
-    assert(LogTestController::getInstance().contains("li\\ne5") == true);
+    
assert(LogTestController::getInstance().contains(utils::StringUtils::to_hex("li\\ne5"))
 == true);
   }
 
  protected:
diff --git a/extensions/standard-processors/tests/unit/TailFileTests.cpp 
b/extensions/standard-processors/tests/unit/TailFileTests.cpp
index 71a11bc..416d3cf 100644
--- a/extensions/standard-processors/tests/unit/TailFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/TailFileTests.cpp
@@ -25,6 +25,9 @@
 #include <string>
 #include <iostream>
 #include <set>
+#include <algorithm>
+#include <random>
+#include <cstdlib>
 #include "FlowController.h"
 #include "TestBase.h"
 #include "core/Core.h"
@@ -70,10 +73,13 @@ TEST_CASE("TailFileWithDelimiter", "[tailfiletest2]") {
 
   testController.runSession(plan, false);
   auto records = plan->getProvenanceRecords();
-  std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
-  REQUIRE(record == nullptr);
   REQUIRE(records.size() == 2);
 
+  testController.runSession(plan, false);
+
+  REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files"));
+  REQUIRE(LogTestController::getInstance().contains("Size:" + 
std::to_string(NEWLINE_FILE.find_first_of('\n')) + " Offset:0"));
+
   LogTestController::getInstance().reset();
 
   // Delete the test and state file.
@@ -323,6 +329,8 @@ TEST_CASE("TailFileWithOutDelimiter", "[tailfiletest2]") {
   // Create and write to the test file
 
   TestController testController;
+  LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
+  LogTestController::getInstance().setDebug<core::ProcessSession>();
   
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
 
   std::shared_ptr<TestPlan> plan = testController.createPlan();
@@ -354,12 +362,128 @@ TEST_CASE("TailFileWithOutDelimiter", "[tailfiletest2]") 
{
 
   testController.runSession(plan, false);
   auto records = plan->getProvenanceRecords();
-  std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
-  REQUIRE(record == nullptr);
   REQUIRE(records.size() == 2);
 
   testController.runSession(plan, false);
 
+  REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files"));
+  REQUIRE(LogTestController::getInstance().contains("Size:" + 
std::to_string(NEWLINE_FILE.size()) + " Offset:0"));
+
+  LogTestController::getInstance().reset();
+
+  // Delete the test and state file.
+  remove(TMP_FILE);
+  remove(STATE_FILE);
+}
+
+TEST_CASE("TailFileLongWithDelimiter", "[tailfiletest2]") {
+  std::string line1("foo");
+  std::string line2(8050, 0);
+  std::mt19937 gen(std::random_device { }());
+  std::generate_n(line2.begin(), line2.size(), [&]() -> char {
+    return 32 + gen() % (127 - 32);
+  });
+  std::string line3("bar");
+  std::string line4("buzz");
+
+  // Create and write to the test file
+  std::ofstream tmpfile;
+  tmpfile.open(TMP_FILE);
+  tmpfile << line1 << "\n" << line2 << "\n" << line3 << "\n" << line4;
+  tmpfile.close();
+
+  TestController testController;
+  LogTestController::getInstance().setTrace<TestPlan>();
+  LogTestController::getInstance().setTrace<processors::TailFile>();
+  LogTestController::getInstance().setTrace<processors::LogAttribute>();
+  LogTestController::getInstance().setTrace<core::ProcessSession>();
+
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+  std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", 
"tailfileProc");
+
+  char format[] = "/tmp/gt.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+
+  plan->setProperty(tailfile, 
org::apache::nifi::minifi::processors::TailFile::FileName.getName(), TMP_FILE);
+  plan->setProperty(tailfile, 
org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), 
STATE_FILE);
+  plan->setProperty(tailfile, 
org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
+
+  std::shared_ptr<core::Processor> log_attr = 
plan->addProcessor("LogAttribute", "Log", core::Relationship("success", 
"description"), true);
+  plan->setProperty(log_attr, 
processors::LogAttribute::FlowFilesToLog.getName(), "0");
+  plan->setProperty(log_attr, processors::LogAttribute::LogPayload.getName(), 
"true");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("Logged 3 flow files"));
+  
REQUIRE(LogTestController::getInstance().contains(utils::StringUtils::to_hex(line1)));
+  auto line2_hex = utils::StringUtils::to_hex(line2);
+  std::stringstream line2_hex_lines;
+  for (size_t i = 0; i < line2_hex.size(); i += 80) {
+    line2_hex_lines << line2_hex.substr(i, 80) << '\n';
+  }
+  REQUIRE(LogTestController::getInstance().contains(line2_hex_lines.str()));
+  
REQUIRE(LogTestController::getInstance().contains(utils::StringUtils::to_hex(line3)));
+  REQUIRE(false == 
LogTestController::getInstance().contains(utils::StringUtils::to_hex(line4), 
std::chrono::seconds(0)));
+
+
+  LogTestController::getInstance().reset();
+
+  // Delete the test and state file.
+  remove(TMP_FILE);
+  remove(STATE_FILE);
+}
+
+TEST_CASE("TailFileWithDelimiterMultipleDelimiters", "[tailfiletest2]") {
+  // Test having two delimiters on the buffer boundary
+  std::string line1(4097, '\n');
+  std::mt19937 gen(std::random_device { }());
+  std::generate_n(line1.begin(), 4095, [&]() -> char {
+  return 32 + gen() % (127 - 32);
+  });
+  std::string line2("foo");
+  std::string line3("bar");
+  std::string line4("buzz");
+
+  // Create and write to the test file
+  std::ofstream tmpfile;
+  tmpfile.open(TMP_FILE);
+  tmpfile << line1 << "\n" << line2 << "\n" << line3 << "\n" << line4;
+  tmpfile.close();
+
+  TestController testController;
+  LogTestController::getInstance().setTrace<TestPlan>();
+  LogTestController::getInstance().setTrace<processors::TailFile>();
+  LogTestController::getInstance().setTrace<processors::LogAttribute>();
+  LogTestController::getInstance().setTrace<core::ProcessSession>();
+
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+  std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", 
"tailfileProc");
+  auto id = tailfile->getUUIDStr();
+
+  char format[] = "/tmp/gt.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+
+  plan->setProperty(tailfile, 
org::apache::nifi::minifi::processors::TailFile::FileName.getName(), TMP_FILE);
+  plan->setProperty(tailfile, 
org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), 
STATE_FILE);
+  plan->setProperty(tailfile, 
org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
+
+  std::shared_ptr<core::Processor> log_attr = 
plan->addProcessor("LogAttribute", "Log", core::Relationship("success", 
"description"), true);
+  plan->setProperty(log_attr, 
processors::LogAttribute::FlowFilesToLog.getName(), "0");
+  plan->setProperty(log_attr, processors::LogAttribute::LogPayload.getName(), 
"true");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("Logged 5 flow files"));
+  auto line1_hex = utils::StringUtils::to_hex(line1.substr(0, 4095));
+  std::stringstream line1_hex_lines;
+  for (size_t i = 0; i < line1_hex.size(); i += 80) {
+    line1_hex_lines << line1_hex.substr(i, 80) << '\n';
+  }
+  REQUIRE(LogTestController::getInstance().contains(line1_hex_lines.str()));
+  
REQUIRE(LogTestController::getInstance().contains(utils::StringUtils::to_hex(line2)));
+  
REQUIRE(LogTestController::getInstance().contains(utils::StringUtils::to_hex(line3)));
+  REQUIRE(false == 
LogTestController::getInstance().contains(utils::StringUtils::to_hex(line4), 
std::chrono::seconds(0)));
+
   LogTestController::getInstance().reset();
 
   // Delete the test and state file.
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index dd91442..9959890 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -189,4 +189,4 @@ endif()
 set_property(TARGET core-minifi-shared PROPERTY POSITION_INDEPENDENT_CODE ON)
 set_property(TARGET minifi-shared PROPERTY POSITION_INDEPENDENT_CODE ON)
 #endif()
-endif(ENABLE_PYTHON AND NOT STATIC_BUILD)
\ No newline at end of file
+endif(ENABLE_PYTHON AND NOT STATIC_BUILD)
diff --git a/libminifi/include/core/Deprecated.h 
b/libminifi/include/core/Deprecated.h
new file mode 100644
index 0000000..faed844
--- /dev/null
+++ b/libminifi/include/core/Deprecated.h
@@ -0,0 +1,29 @@
+/**
+
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_DEPRECATED_H_
+#define LIBMINIFI_INCLUDE_CORE_DEPRECATED_H_
+
+#ifdef _MSC_VER
+#define DEPRECATED __declspec(deprecated)
+#elif defined(__GNUC__) | defined(__clang__)
+#define DEPRECATED __attribute__((__deprecated__))
+#else
+#define DEPRECATED
+#endif
+
+#endif /* LIBMINIFI_INCLUDE_CORE_DEPRECATED_H_ */
diff --git a/libminifi/include/core/ProcessSession.h 
b/libminifi/include/core/ProcessSession.h
index 52462e9..7bcd7f5 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -31,6 +31,7 @@
 #include "FlowFileRecord.h"
 #include "Exception.h"
 #include "core/logging/LoggerConfiguration.h"
+#include "core/Deprecated.h"
 #include "FlowFile.h"
 #include "WeakReference.h"
 #include "provenance/Provenance.h"
@@ -110,7 +111,8 @@ class ProcessSession : public ReferenceContainer {
   void importFrom(io::DataStream &stream, const 
std::shared_ptr<core::FlowFile> &flow);
   // import from the data source.
   void import(std::string source, const std::shared_ptr<core::FlowFile> &flow, 
bool keepSource = true, uint64_t offset = 0);
-  void import(std::string source, std::vector<std::shared_ptr<FlowFileRecord>> 
&flows, bool keepSource, uint64_t offset, char inputDelimiter);
+  DEPRECATED void import(std::string source, 
std::vector<std::shared_ptr<FlowFileRecord>> &flows, bool keepSource, uint64_t 
offset, char inputDelimiter);
+  void import(const std::string& source, 
std::vector<std::shared_ptr<FlowFileRecord>> &flows, uint64_t offset, char 
inputDelimiter);
 
   /**
    * Exports the data stream to a file
diff --git a/libminifi/src/core/ProcessSession.cpp 
b/libminifi/src/core/ProcessSession.cpp
index 40ee900..42271b8 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -384,8 +384,7 @@ void ProcessSession::read(const 
std::shared_ptr<core::FlowFile> &flow, InputStre
 void ProcessSession::importFrom(io::DataStream &stream, const 
std::shared_ptr<core::FlowFile> &flow) {
   std::shared_ptr<ResourceClaim> claim = 
std::make_shared<ResourceClaim>(process_context_->getContentRepository());
   size_t max_read = getpagesize();
-  std::vector<uint8_t> charBuffer;
-  charBuffer.resize(max_read);
+  std::vector<uint8_t> charBuffer(max_read);
 
   try {
     auto startTime = getTimeMillis();
@@ -450,12 +449,10 @@ void ProcessSession::importFrom(io::DataStream &stream, 
const std::shared_ptr<co
 
 void ProcessSession::import(std::string source, const 
std::shared_ptr<core::FlowFile> &flow, bool keepSource, uint64_t offset) {
   std::shared_ptr<ResourceClaim> claim = 
std::make_shared<ResourceClaim>(process_context_->getContentRepository());
-  int size = getpagesize();
-  std::vector<uint8_t> charBuffer;
-  charBuffer.resize(size);
+  size_t size = getpagesize();
+  std::vector<uint8_t> charBuffer(size);
 
   try {
-    //  std::ofstream fs;
     auto startTime = getTimeMillis();
     std::ifstream input;
     input.open(source.c_str(), std::fstream::in | std::fstream::binary);
@@ -537,63 +534,82 @@ void ProcessSession::import(std::string source, const 
std::shared_ptr<core::Flow
   }
 }
 
-void ProcessSession::import(std::string source, 
std::vector<std::shared_ptr<FlowFileRecord>> &flows, bool keepSource, uint64_t 
offset, char inputDelimiter) {
+void ProcessSession::import(const std::string& source, 
std::vector<std::shared_ptr<FlowFileRecord>> &flows, uint64_t offset, char 
inputDelimiter) {
   std::shared_ptr<ResourceClaim> claim;
+  std::shared_ptr<io::BaseStream> stream;
   std::shared_ptr<FlowFileRecord> flowFile;
-  int size = getpagesize();
-  std::vector<char> charBuffer;
-  charBuffer.resize(size);
 
+  std::vector<uint8_t> buffer(getpagesize());
   try {
-    // Open the input file and seek to the appropriate location.
-    std::ifstream input;
-    logger_->log_debug("Opening %s", source);
-    input.open(source.c_str(), std::fstream::in | std::fstream::binary);
-    if (input.is_open() && input.good()) {
-      if (offset != 0) {
+    try {
+      std::ifstream input;
+      logger_->log_debug("Opening %s", source);
+      input.open(source.c_str(), std::fstream::in | std::fstream::binary);
+      if (!input.is_open() || !input.good()) {
+        input.close();
+        throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
+      }
+      if (offset != 0U) {
         input.seekg(offset, input.beg);
         if (!input.good()) {
-          logger_->log_error("Seeking to %d failed for file %s (does 
file/filesystem support seeking?)", offset, source);
+          logger_->log_error("Seeking to %lu failed for file %s (does 
file/filesystem support seeking?)", offset, source);
           throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
         }
       }
-      while (input.good() && !input.eof()) {
-        bool invalidWrite = false;
-        uint64_t startTime = getTimeMillis();
-        input.getline(charBuffer.data(), size, inputDelimiter);
-
-        if (input.eof() || input.fail()) {
+      uint64_t startTime = 0U;
+      while (input.good()) {
+        input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
+        std::streamsize read = input.gcount();
+        if (read < 0) {
+          throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount 
returned negative value");
+        }
+        if (read == 0) {
           logger_->log_trace("Finished reading input %s", source);
           break;
         }
-        flowFile = std::static_pointer_cast<FlowFileRecord>(create());
-        claim = 
std::make_shared<ResourceClaim>(process_context_->getContentRepository());
-
-        size_t bufsize = strlen(charBuffer.data());
-        std::shared_ptr<io::BaseStream> stream = 
process_context_->getContentRepository()->write(claim);
-        if (nullptr == stream) {
-          logger_->log_debug("Stream is null");
-          rollback();
-          return;
-        }
-
-        if (input.good()) {
-          if (stream->write(reinterpret_cast<uint8_t*>(charBuffer.data()), 
bufsize) < 0) {
-            invalidWrite = true;
+        uint8_t* begin = buffer.data();
+        uint8_t* end = begin + read;
+        while (true) {
+          uint8_t* delimiterPos = std::find(begin, end, 
static_cast<uint8_t>(inputDelimiter));
+          int len = delimiterPos - begin;
+
+          /*
+           * We do not want to process the rest of the buffer after the last 
delimiter if
+           *  - we have reached EOF in the file (we would discard it anyway)
+           *  - there is nothing to process (the last character in the buffer 
is a delimiter)
+           */
+          if (delimiterPos == end && (input.eof() || len == 0)) {
             break;
           }
-        } else {
-          if (stream->write(reinterpret_cast<uint8_t*>(charBuffer.data()), 
input.gcount()) < 0) {
-            invalidWrite = true;
-            break;
+
+          /* Create claim and stream if needed and append data */
+          if (claim == nullptr) {
+            startTime = getTimeMillis();
+            claim = 
std::make_shared<ResourceClaim>(process_context_->getContentRepository());
+          }
+          if (stream == nullptr) {
+            stream = process_context_->getContentRepository()->write(claim);
+          }
+          if (stream == nullptr) {
+            logger_->log_error("Stream is null");
+            rollback();
+            return;
+          }
+          if (stream->write(begin, len) != len) {
+            logger_->log_error("Error while writing");
+            stream->closeStream();
+            throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error 
creating Flowfile");
           }
-        }
 
-        if (!invalidWrite) {
+          /* Create a FlowFile if we reached a delimiter */
+          if (delimiterPos == end) {
+            break;
+          }
+          flowFile = std::static_pointer_cast<FlowFileRecord>(create());
           flowFile->setSize(stream->getSize());
           flowFile->setOffset(0);
           if (flowFile->getResourceClaim() != nullptr) {
-            // Remove the old claim
+            /* Remove the old claim */
             flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
             flowFile->clearResourceClaim();
           }
@@ -606,37 +622,40 @@ void ProcessSession::import(std::string source, 
std::vector<std::shared_ptr<Flow
           uint64_t endTime = getTimeMillis();
           provenance_report_->modifyContent(flowFile, details, endTime - 
startTime);
           flows.push_back(flowFile);
-        } else {
-          logger_->log_debug("Error while writing");
-          stream->closeStream();
-          throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error 
creating Flowfile");
+
+          /* Reset these to start processing the next FlowFile with a clean 
slate */
+          flowFile.reset();
+          stream.reset();
+          claim.reset();
+
+          /* Skip delimiter */
+          begin = delimiterPos + 1;
         }
       }
-      input.close();
-      logger_->log_trace("Closed input %s, keeping source ? %i", source, 
keepSource);
-      if (!keepSource)
-        std::remove(source.c_str());
-    } else {
-      input.close();
-      throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
+    } catch (std::exception &exception) {
+      logger_->log_debug("Caught Exception %s", exception.what());
+      throw;
+    } catch (...) {
+      logger_->log_debug("Caught Exception during process session write");
+      throw;
     }
-  } catch (std::exception &exception) {
-    if (flowFile && flowFile->getResourceClaim() == claim) {
-      flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-      flowFile->clearResourceClaim();
-    }
-    logger_->log_debug("Caught Exception %s", exception.what());
-    throw;
   } catch (...) {
-    if (flowFile && flowFile->getResourceClaim() == claim) {
+    if (flowFile != nullptr && claim != nullptr && 
flowFile->getResourceClaim() == claim) {
       flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
       flowFile->clearResourceClaim();
     }
-    logger_->log_debug("Caught Exception during process session write");
     throw;
   }
 }
 
+void ProcessSession::import(std::string source, 
std::vector<std::shared_ptr<FlowFileRecord>> &flows, bool keepSource, uint64_t 
offset, char inputDelimiter) {
+  import(source, flows, offset, inputDelimiter);
+  logger_->log_trace("Closed input %s, keeping source ? %i", source, 
keepSource);
+  if (!keepSource) {
+    std::remove(source.c_str());
+  }
+}
+
 bool ProcessSession::exportContent(const std::string &destination, const 
std::string &tmpFile, const std::shared_ptr<core::FlowFile> &flow, bool 
keepContent) {
   logger_->log_debug("Exporting content of %s to %s", flow->getUUIDStr(), 
destination);
 

Reply via email to