Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 2da613875 -> 6220b37c1
MINIFICPP-430 Fixed improper size reporting/failed read for importing files or filesystems which do not support seek operations This closes #282. Signed-off-by: Marc Parisi <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/6220b37c Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/6220b37c Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/6220b37c Branch: refs/heads/master Commit: 6220b37c1076fbe22f8a99f32cbe285385c25e5d Parents: 2da6138 Author: Andrew I. Christianson <[email protected]> Authored: Fri Mar 16 12:18:28 2018 -0400 Committer: Marc Parisi <[email protected]> Committed: Sun Mar 18 21:16:15 2018 -0400 ---------------------------------------------------------------------- libminifi/src/core/ProcessSession.cpp | 26 ++++++++-- libminifi/test/unit/GetFileTests.cpp | 82 ++++++++++++++++++++++++++++++ 2 files changed, 103 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6220b37c/libminifi/src/core/ProcessSession.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index bd98db6..fde2bc6 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -417,10 +417,18 @@ void ProcessSession::import(std::string source, const std::shared_ptr<core::Flow rollback(); return; } - if (input.is_open()) { - // Open the source file and stream to the flow file - input.seekg(offset); + if (input.is_open() && input.good()) { bool invalidWrite = false; + // Open the source file and stream to the flow file + if (offset != 0) { + input.seekg(offset); + if (!input.good()) { + logger_->log_error("Seeking to %d failed for file %s (does file/filesystem support seeking?)", + offset, + source); + invalidWrite = true; + } + } while (input.good()) { input.read(reinterpret_cast<char*>(charBuffer.data()), size); if (input) { @@ -494,8 +502,16 @@ void ProcessSession::import(std::string source, std::vector<std::shared_ptr<Flow std::ifstream input; logger_->log_debug("Opening %s", source); input.open(source.c_str(), std::fstream::in | std::fstream::binary); - if (input.is_open()) { - input.seekg(offset, input.beg); + if (input.is_open() && input.good()) { + if (offset != 0) { + input.seekg(offset, input.beg); + if (!input.good()) { + logger_->log_error("Seeking to %d failed for file %s (does file/filesystem support seeking?)", + offset, + source); + throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error"); + } + } while (input.good()) { bool invalidWrite = false; flowFile = std::static_pointer_cast<FlowFileRecord>(create()); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6220b37c/libminifi/test/unit/GetFileTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/GetFileTests.cpp b/libminifi/test/unit/GetFileTests.cpp new file mode 100644 index 0000000..6e8aa25 --- /dev/null +++ b/libminifi/test/unit/GetFileTests.cpp @@ -0,0 +1,82 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <utility> +#include <memory> +#include <string> +#include <vector> +#include <set> +#include <fstream> + + +#include "../TestBase.h" +#include "processors/LogAttribute.h" +#include "processors/GetFile.h" + +TEST_CASE("GetFile: FIFO", "[getFileFifo]") { // NOLINT + TestController testController; + + LogTestController::getInstance().setTrace<TestPlan>(); + LogTestController::getInstance().setTrace<processors::GetFile>(); + LogTestController::getInstance().setTrace<processors::LogAttribute>(); + + auto plan = testController.createPlan(); + auto repo = std::make_shared<TestRepository>(); + + // Define directory for input + std::string in_dir("/tmp/gt.XXXXXX"); + REQUIRE(testController.createTempDirectory(&in_dir[0]) != nullptr); + + // Define test input file + std::string in_file(in_dir); + in_file.append("/testfifo"); + + // Build MiNiFi processing graph + auto get_file = plan->addProcessor( + "GetFile", + "Get"); + plan->setProperty( + get_file, + processors::GetFile::Directory.getName(), in_dir); + plan->setProperty( + get_file, + processors::GetFile::KeepSourceFile.getName(), + "true"); + plan->addProcessor( + "LogAttribute", + "Log", + core::Relationship("success", "description"), + true); + + // Write test input + REQUIRE(0 == mkfifo(in_file.c_str(), 0777)); + + // Run test flow + std::thread write_thread([&] { + std::ofstream in_file_stream(in_file); + in_file_stream << "The quick brown fox jumps over the lazy dog" << std::endl; + }); + + plan->runNextProcessor(); // Get + plan->runNextProcessor(); // Log + + write_thread.join(); + + // Check log output + REQUIRE(LogTestController::getInstance().contains("Size:44 Offset:0")); +} +
