This is an automated email from the ASF dual-hosted git repository. aboda pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit a49fe09c12efee4ca9c5db37dc8aca80dbf7eafc Author: Daniel Bakai <[email protected]> AuthorDate: Sun Jun 30 19:16:21 2019 +0200 MINIFICPP-925 - Fix TailFile hang on long lines Signed-off-by: Arpad Boda <[email protected]> This closes #596 --- .../processors/LogAttribute.cpp | 27 ++-- .../standard-processors/processors/LogAttribute.h | 32 ++--- .../standard-processors/processors/TailFile.cpp | 5 +- .../tests/integration/TailFileCronTest.cpp | 2 +- .../tests/integration/TailFileTest.cpp | 2 +- .../tests/unit/TailFileTests.cpp | 132 +++++++++++++++++- libminifi/CMakeLists.txt | 2 +- libminifi/include/core/Deprecated.h | 29 ++++ libminifi/include/core/ProcessSession.h | 4 +- libminifi/src/core/ProcessSession.cpp | 147 ++++++++++++--------- 10 files changed, 276 insertions(+), 106 deletions(-) diff --git a/extensions/standard-processors/processors/LogAttribute.cpp b/extensions/standard-processors/processors/LogAttribute.cpp index 6ffa397..cc44f25 100644 --- a/extensions/standard-processors/processors/LogAttribute.cpp +++ b/extensions/standard-processors/processors/LogAttribute.cpp @@ -135,35 +135,34 @@ void LogAttribute::onTrigger(const std::shared_ptr<core::ProcessContext> &contex } if (logPayload && flow->getSize() <= 1024 * 1024) { message << "\n" << "Payload:" << "\n"; - ReadCallback callback(flow->getSize()); + ReadCallback callback(logger_, flow->getSize()); session->read(flow, &callback); - for (unsigned int i = 0, j = 0; i < callback.read_size_; i++) { - message << std::hex << callback.buffer_[i]; - j++; - if (j == 80) { - message << '\n'; - j = 0; - } + + auto payload_hex = utils::StringUtils::to_hex(callback.buffer_.data(), callback.buffer_.size()); + for (size_t i = 0; i < payload_hex.size(); i += 80) { + message << payload_hex.substr(i, 80) << '\n'; } + } else { + message << "\n"; } - message << "\n" << dashLine; + message << dashLine; std::string output = message.str(); switch (level) { case LogAttrLevelInfo: - logger_->log_info("%s", output); + logging::LOG_INFO(logger_) << output; break; case LogAttrLevelDebug: - logger_->log_debug("%s", output); + logging::LOG_DEBUG(logger_) << output; break; case LogAttrLevelError: - logger_->log_error("%s", output); + logging::LOG_ERROR(logger_) << output; break; case LogAttrLevelTrace: - logger_->log_trace("%s", output); + logging::LOG_TRACE(logger_) << output; break; case LogAttrLevelWarn: - logger_->log_warn("%s", output); + logging::LOG_WARN(logger_) << output; break; default: break; diff --git a/extensions/standard-processors/processors/LogAttribute.h b/extensions/standard-processors/processors/LogAttribute.h index 5d03e11..a8f00eb 100644 --- a/extensions/standard-processors/processors/LogAttribute.h +++ b/extensions/standard-processors/processors/LogAttribute.h @@ -89,27 +89,23 @@ class LogAttribute : public core::Processor { // Nest Callback Class for read stream class ReadCallback : public InputStreamCallback { public: - ReadCallback(uint64_t size) - : read_size_(0) { - buffer_size_ = size; - buffer_ = new uint8_t[buffer_size_]; - } - ~ReadCallback() { - if (buffer_) - delete[] buffer_; + ReadCallback(std::shared_ptr<logging::Logger> logger, uint64_t size) + : logger_(std::move(logger)) + , buffer_(size) { } int64_t process(std::shared_ptr<io::BaseStream> stream) { - int64_t ret = 0; - ret = stream->read(buffer_, buffer_size_); - if (!stream) - read_size_ = stream->getSize(); - else - read_size_ = buffer_size_; - return ret; + if (buffer_.size() == 0U) { + return 0U; + } + int ret = stream->read(buffer_.data(), buffer_.size()); + if (ret != buffer_.size()) { + logger_->log_error("%zu bytes were requested from the stream but %d bytes were read. Rolling back.", buffer_.size(), ret); + throw Exception(PROCESSOR_EXCEPTION, "Failed to read the entire FlowFile."); + } + return buffer_.size(); } - uint8_t *buffer_; - uint64_t buffer_size_; - uint64_t read_size_; + std::shared_ptr<logging::Logger> logger_; + std::vector<uint8_t> buffer_; }; public: diff --git a/extensions/standard-processors/processors/TailFile.cpp b/extensions/standard-processors/processors/TailFile.cpp index e6f1824..2fd0861 100644 --- a/extensions/standard-processors/processors/TailFile.cpp +++ b/extensions/standard-processors/processors/TailFile.cpp @@ -72,7 +72,8 @@ core::Property TailFile::StateFile("State File", "Specifies the file that should " what data has been ingested so that upon restart NiFi can resume from where it left off", "TailFileState"); core::Property TailFile::Delimiter("Input Delimiter", "Specifies the character that should be used for delimiting the data being tailed" - "from the incoming file.", + "from the incoming file." + "If none is specified, data will be ingested as it becomes available.", ""); core::Property TailFile::TailMode( @@ -416,7 +417,7 @@ void TailFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context, c } logger_->log_debug("Looking for delimiter 0x%X", delim); std::vector<std::shared_ptr<FlowFileRecord>> flowFiles; - session->import(fullPath, flowFiles, true, state.second.currentTailFilePosition_, delim); + session->import(fullPath, flowFiles, state.second.currentTailFilePosition_, delim); logger_->log_info("%u flowfiles were received from TailFile input", flowFiles.size()); for (auto ffr : flowFiles) { diff --git a/extensions/standard-processors/tests/integration/TailFileCronTest.cpp b/extensions/standard-processors/tests/integration/TailFileCronTest.cpp index 2b5ee77..589c92c 100644 --- a/extensions/standard-processors/tests/integration/TailFileCronTest.cpp +++ b/extensions/standard-processors/tests/integration/TailFileCronTest.cpp @@ -73,7 +73,7 @@ class TailFileTestHarness : public IntegrationBase { virtual void runAssertions() { assert(LogTestController::getInstance().contains("5 flowfiles were received from TailFile input") == true); assert(LogTestController::getInstance().contains("Looking for delimiter 0xA") == true); - assert(LogTestController::getInstance().contains("li\\ne5") == true); + assert(LogTestController::getInstance().contains(utils::StringUtils::to_hex("li\\ne5")) == true); } protected: diff --git a/extensions/standard-processors/tests/integration/TailFileTest.cpp b/extensions/standard-processors/tests/integration/TailFileTest.cpp index 2b5ee77..589c92c 100644 --- a/extensions/standard-processors/tests/integration/TailFileTest.cpp +++ b/extensions/standard-processors/tests/integration/TailFileTest.cpp @@ -73,7 +73,7 @@ class TailFileTestHarness : public IntegrationBase { virtual void runAssertions() { assert(LogTestController::getInstance().contains("5 flowfiles were received from TailFile input") == true); assert(LogTestController::getInstance().contains("Looking for delimiter 0xA") == true); - assert(LogTestController::getInstance().contains("li\\ne5") == true); + assert(LogTestController::getInstance().contains(utils::StringUtils::to_hex("li\\ne5")) == true); } protected: diff --git a/extensions/standard-processors/tests/unit/TailFileTests.cpp b/extensions/standard-processors/tests/unit/TailFileTests.cpp index 71a11bc..416d3cf 100644 --- a/extensions/standard-processors/tests/unit/TailFileTests.cpp +++ b/extensions/standard-processors/tests/unit/TailFileTests.cpp @@ -25,6 +25,9 @@ #include <string> #include <iostream> #include <set> +#include <algorithm> +#include <random> +#include <cstdlib> #include "FlowController.h" #include "TestBase.h" #include "core/Core.h" @@ -70,10 +73,13 @@ TEST_CASE("TailFileWithDelimiter", "[tailfiletest2]") { testController.runSession(plan, false); auto records = plan->getProvenanceRecords(); - std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile(); - REQUIRE(record == nullptr); REQUIRE(records.size() == 2); + testController.runSession(plan, false); + + REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files")); + REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(NEWLINE_FILE.find_first_of('\n')) + " Offset:0")); + LogTestController::getInstance().reset(); // Delete the test and state file. @@ -323,6 +329,8 @@ TEST_CASE("TailFileWithOutDelimiter", "[tailfiletest2]") { // Create and write to the test file TestController testController; + LogTestController::getInstance().setTrace<minifi::processors::TailFile>(); + LogTestController::getInstance().setDebug<core::ProcessSession>(); LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); std::shared_ptr<TestPlan> plan = testController.createPlan(); @@ -354,12 +362,128 @@ TEST_CASE("TailFileWithOutDelimiter", "[tailfiletest2]") { testController.runSession(plan, false); auto records = plan->getProvenanceRecords(); - std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile(); - REQUIRE(record == nullptr); REQUIRE(records.size() == 2); testController.runSession(plan, false); + REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files")); + REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(NEWLINE_FILE.size()) + " Offset:0")); + + LogTestController::getInstance().reset(); + + // Delete the test and state file. + remove(TMP_FILE); + remove(STATE_FILE); +} + +TEST_CASE("TailFileLongWithDelimiter", "[tailfiletest2]") { + std::string line1("foo"); + std::string line2(8050, 0); + std::mt19937 gen(std::random_device { }()); + std::generate_n(line2.begin(), line2.size(), [&]() -> char { + return 32 + gen() % (127 - 32); + }); + std::string line3("bar"); + std::string line4("buzz"); + + // Create and write to the test file + std::ofstream tmpfile; + tmpfile.open(TMP_FILE); + tmpfile << line1 << "\n" << line2 << "\n" << line3 << "\n" << line4; + tmpfile.close(); + + TestController testController; + LogTestController::getInstance().setTrace<TestPlan>(); + LogTestController::getInstance().setTrace<processors::TailFile>(); + LogTestController::getInstance().setTrace<processors::LogAttribute>(); + LogTestController::getInstance().setTrace<core::ProcessSession>(); + + std::shared_ptr<TestPlan> plan = testController.createPlan(); + std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc"); + + char format[] = "/tmp/gt.XXXXXX"; + char *dir = testController.createTempDirectory(format); + + plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), TMP_FILE); + plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), STATE_FILE); + plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n"); + + std::shared_ptr<core::Processor> log_attr = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true); + plan->setProperty(log_attr, processors::LogAttribute::FlowFilesToLog.getName(), "0"); + plan->setProperty(log_attr, processors::LogAttribute::LogPayload.getName(), "true"); + + testController.runSession(plan, true); + + REQUIRE(LogTestController::getInstance().contains("Logged 3 flow files")); + REQUIRE(LogTestController::getInstance().contains(utils::StringUtils::to_hex(line1))); + auto line2_hex = utils::StringUtils::to_hex(line2); + std::stringstream line2_hex_lines; + for (size_t i = 0; i < line2_hex.size(); i += 80) { + line2_hex_lines << line2_hex.substr(i, 80) << '\n'; + } + REQUIRE(LogTestController::getInstance().contains(line2_hex_lines.str())); + REQUIRE(LogTestController::getInstance().contains(utils::StringUtils::to_hex(line3))); + REQUIRE(false == LogTestController::getInstance().contains(utils::StringUtils::to_hex(line4), std::chrono::seconds(0))); + + + LogTestController::getInstance().reset(); + + // Delete the test and state file. + remove(TMP_FILE); + remove(STATE_FILE); +} + +TEST_CASE("TailFileWithDelimiterMultipleDelimiters", "[tailfiletest2]") { + // Test having two delimiters on the buffer boundary + std::string line1(4097, '\n'); + std::mt19937 gen(std::random_device { }()); + std::generate_n(line1.begin(), 4095, [&]() -> char { + return 32 + gen() % (127 - 32); + }); + std::string line2("foo"); + std::string line3("bar"); + std::string line4("buzz"); + + // Create and write to the test file + std::ofstream tmpfile; + tmpfile.open(TMP_FILE); + tmpfile << line1 << "\n" << line2 << "\n" << line3 << "\n" << line4; + tmpfile.close(); + + TestController testController; + LogTestController::getInstance().setTrace<TestPlan>(); + LogTestController::getInstance().setTrace<processors::TailFile>(); + LogTestController::getInstance().setTrace<processors::LogAttribute>(); + LogTestController::getInstance().setTrace<core::ProcessSession>(); + + std::shared_ptr<TestPlan> plan = testController.createPlan(); + std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc"); + auto id = tailfile->getUUIDStr(); + + char format[] = "/tmp/gt.XXXXXX"; + char *dir = testController.createTempDirectory(format); + + plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), TMP_FILE); + plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), STATE_FILE); + plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n"); + + std::shared_ptr<core::Processor> log_attr = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true); + plan->setProperty(log_attr, processors::LogAttribute::FlowFilesToLog.getName(), "0"); + plan->setProperty(log_attr, processors::LogAttribute::LogPayload.getName(), "true"); + + testController.runSession(plan, true); + + REQUIRE(LogTestController::getInstance().contains("Logged 5 flow files")); + auto line1_hex = utils::StringUtils::to_hex(line1.substr(0, 4095)); + std::stringstream line1_hex_lines; + for (size_t i = 0; i < line1_hex.size(); i += 80) { + line1_hex_lines << line1_hex.substr(i, 80) << '\n'; + } + REQUIRE(LogTestController::getInstance().contains(line1_hex_lines.str())); + REQUIRE(LogTestController::getInstance().contains(utils::StringUtils::to_hex(line2))); + REQUIRE(LogTestController::getInstance().contains(utils::StringUtils::to_hex(line3))); + REQUIRE(false == LogTestController::getInstance().contains(utils::StringUtils::to_hex(line4), std::chrono::seconds(0))); + LogTestController::getInstance().reset(); // Delete the test and state file. diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt index dd91442..9959890 100644 --- a/libminifi/CMakeLists.txt +++ b/libminifi/CMakeLists.txt @@ -189,4 +189,4 @@ endif() set_property(TARGET core-minifi-shared PROPERTY POSITION_INDEPENDENT_CODE ON) set_property(TARGET minifi-shared PROPERTY POSITION_INDEPENDENT_CODE ON) #endif() -endif(ENABLE_PYTHON AND NOT STATIC_BUILD) \ No newline at end of file +endif(ENABLE_PYTHON AND NOT STATIC_BUILD) diff --git a/libminifi/include/core/Deprecated.h b/libminifi/include/core/Deprecated.h new file mode 100644 index 0000000..faed844 --- /dev/null +++ b/libminifi/include/core/Deprecated.h @@ -0,0 +1,29 @@ +/** + + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_DEPRECATED_H_ +#define LIBMINIFI_INCLUDE_CORE_DEPRECATED_H_ + +#ifdef _MSC_VER +#define DEPRECATED __declspec(deprecated) +#elif defined(__GNUC__) | defined(__clang__) +#define DEPRECATED __attribute__((__deprecated__)) +#else +#define DEPRECATED +#endif + +#endif /* LIBMINIFI_INCLUDE_CORE_DEPRECATED_H_ */ diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h index 52462e9..7bcd7f5 100644 --- a/libminifi/include/core/ProcessSession.h +++ b/libminifi/include/core/ProcessSession.h @@ -31,6 +31,7 @@ #include "FlowFileRecord.h" #include "Exception.h" #include "core/logging/LoggerConfiguration.h" +#include "core/Deprecated.h" #include "FlowFile.h" #include "WeakReference.h" #include "provenance/Provenance.h" @@ -110,7 +111,8 @@ class ProcessSession : public ReferenceContainer { void importFrom(io::DataStream &stream, const std::shared_ptr<core::FlowFile> &flow); // import from the data source. void import(std::string source, const std::shared_ptr<core::FlowFile> &flow, bool keepSource = true, uint64_t offset = 0); - void import(std::string source, std::vector<std::shared_ptr<FlowFileRecord>> &flows, bool keepSource, uint64_t offset, char inputDelimiter); + DEPRECATED void import(std::string source, std::vector<std::shared_ptr<FlowFileRecord>> &flows, bool keepSource, uint64_t offset, char inputDelimiter); + void import(const std::string& source, std::vector<std::shared_ptr<FlowFileRecord>> &flows, uint64_t offset, char inputDelimiter); /** * Exports the data stream to a file diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index 40ee900..42271b8 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -384,8 +384,7 @@ void ProcessSession::read(const std::shared_ptr<core::FlowFile> &flow, InputStre void ProcessSession::importFrom(io::DataStream &stream, const std::shared_ptr<core::FlowFile> &flow) { std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository()); size_t max_read = getpagesize(); - std::vector<uint8_t> charBuffer; - charBuffer.resize(max_read); + std::vector<uint8_t> charBuffer(max_read); try { auto startTime = getTimeMillis(); @@ -450,12 +449,10 @@ void ProcessSession::importFrom(io::DataStream &stream, const std::shared_ptr<co void ProcessSession::import(std::string source, const std::shared_ptr<core::FlowFile> &flow, bool keepSource, uint64_t offset) { std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository()); - int size = getpagesize(); - std::vector<uint8_t> charBuffer; - charBuffer.resize(size); + size_t size = getpagesize(); + std::vector<uint8_t> charBuffer(size); try { - // std::ofstream fs; auto startTime = getTimeMillis(); std::ifstream input; input.open(source.c_str(), std::fstream::in | std::fstream::binary); @@ -537,63 +534,82 @@ void ProcessSession::import(std::string source, const std::shared_ptr<core::Flow } } -void ProcessSession::import(std::string source, std::vector<std::shared_ptr<FlowFileRecord>> &flows, bool keepSource, uint64_t offset, char inputDelimiter) { +void ProcessSession::import(const std::string& source, std::vector<std::shared_ptr<FlowFileRecord>> &flows, uint64_t offset, char inputDelimiter) { std::shared_ptr<ResourceClaim> claim; + std::shared_ptr<io::BaseStream> stream; std::shared_ptr<FlowFileRecord> flowFile; - int size = getpagesize(); - std::vector<char> charBuffer; - charBuffer.resize(size); + std::vector<uint8_t> buffer(getpagesize()); try { - // Open the input file and seek to the appropriate location. - std::ifstream input; - logger_->log_debug("Opening %s", source); - input.open(source.c_str(), std::fstream::in | std::fstream::binary); - if (input.is_open() && input.good()) { - if (offset != 0) { + try { + std::ifstream input; + logger_->log_debug("Opening %s", source); + input.open(source.c_str(), std::fstream::in | std::fstream::binary); + if (!input.is_open() || !input.good()) { + input.close(); + throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error"); + } + if (offset != 0U) { input.seekg(offset, input.beg); if (!input.good()) { - logger_->log_error("Seeking to %d failed for file %s (does file/filesystem support seeking?)", offset, source); + logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source); throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error"); } } - while (input.good() && !input.eof()) { - bool invalidWrite = false; - uint64_t startTime = getTimeMillis(); - input.getline(charBuffer.data(), size, inputDelimiter); - - if (input.eof() || input.fail()) { + uint64_t startTime = 0U; + while (input.good()) { + input.read(reinterpret_cast<char*>(buffer.data()), buffer.size()); + std::streamsize read = input.gcount(); + if (read < 0) { + throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value"); + } + if (read == 0) { logger_->log_trace("Finished reading input %s", source); break; } - flowFile = std::static_pointer_cast<FlowFileRecord>(create()); - claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository()); - - size_t bufsize = strlen(charBuffer.data()); - std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim); - if (nullptr == stream) { - logger_->log_debug("Stream is null"); - rollback(); - return; - } - - if (input.good()) { - if (stream->write(reinterpret_cast<uint8_t*>(charBuffer.data()), bufsize) < 0) { - invalidWrite = true; + uint8_t* begin = buffer.data(); + uint8_t* end = begin + read; + while (true) { + uint8_t* delimiterPos = std::find(begin, end, static_cast<uint8_t>(inputDelimiter)); + int len = delimiterPos - begin; + + /* + * We do not want to process the rest of the buffer after the last delimiter if + * - we have reached EOF in the file (we would discard it anyway) + * - there is nothing to process (the last character in the buffer is a delimiter) + */ + if (delimiterPos == end && (input.eof() || len == 0)) { break; } - } else { - if (stream->write(reinterpret_cast<uint8_t*>(charBuffer.data()), input.gcount()) < 0) { - invalidWrite = true; - break; + + /* Create claim and stream if needed and append data */ + if (claim == nullptr) { + startTime = getTimeMillis(); + claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository()); + } + if (stream == nullptr) { + stream = process_context_->getContentRepository()->write(claim); + } + if (stream == nullptr) { + logger_->log_error("Stream is null"); + rollback(); + return; + } + if (stream->write(begin, len) != len) { + logger_->log_error("Error while writing"); + stream->closeStream(); + throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile"); } - } - if (!invalidWrite) { + /* Create a FlowFile if we reached a delimiter */ + if (delimiterPos == end) { + break; + } + flowFile = std::static_pointer_cast<FlowFileRecord>(create()); flowFile->setSize(stream->getSize()); flowFile->setOffset(0); if (flowFile->getResourceClaim() != nullptr) { - // Remove the old claim + /* Remove the old claim */ flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); flowFile->clearResourceClaim(); } @@ -606,37 +622,40 @@ void ProcessSession::import(std::string source, std::vector<std::shared_ptr<Flow uint64_t endTime = getTimeMillis(); provenance_report_->modifyContent(flowFile, details, endTime - startTime); flows.push_back(flowFile); - } else { - logger_->log_debug("Error while writing"); - stream->closeStream(); - throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile"); + + /* Reset these to start processing the next FlowFile with a clean slate */ + flowFile.reset(); + stream.reset(); + claim.reset(); + + /* Skip delimiter */ + begin = delimiterPos + 1; } } - input.close(); - logger_->log_trace("Closed input %s, keeping source ? %i", source, keepSource); - if (!keepSource) - std::remove(source.c_str()); - } else { - input.close(); - throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error"); + } catch (std::exception &exception) { + logger_->log_debug("Caught Exception %s", exception.what()); + throw; + } catch (...) { + logger_->log_debug("Caught Exception during process session write"); + throw; } - } catch (std::exception &exception) { - if (flowFile && flowFile->getResourceClaim() == claim) { - flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); - flowFile->clearResourceClaim(); - } - logger_->log_debug("Caught Exception %s", exception.what()); - throw; } catch (...) { - if (flowFile && flowFile->getResourceClaim() == claim) { + if (flowFile != nullptr && claim != nullptr && flowFile->getResourceClaim() == claim) { flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); flowFile->clearResourceClaim(); } - logger_->log_debug("Caught Exception during process session write"); throw; } } +void ProcessSession::import(std::string source, std::vector<std::shared_ptr<FlowFileRecord>> &flows, bool keepSource, uint64_t offset, char inputDelimiter) { + import(source, flows, offset, inputDelimiter); + logger_->log_trace("Closed input %s, keeping source ? %i", source, keepSource); + if (!keepSource) { + std::remove(source.c_str()); + } +} + bool ProcessSession::exportContent(const std::string &destination, const std::string &tmpFile, const std::shared_ptr<core::FlowFile> &flow, bool keepContent) { logger_->log_debug("Exporting content of %s to %s", flow->getUUIDStr(), destination);
