This is an automated email from the ASF dual-hosted git repository. phrocker pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 7cb81f17982f3b408be5ba34c0bf07113b68d915 Author: Arpad Boda <[email protected]> AuthorDate: Thu Mar 28 12:13:52 2019 +0100 MINIFICPP-792 - TailFile processor should handle rotation of source file --- .../standard-processors/processors/TailFile.cpp | 134 ++++++--------- .../standard-processors/processors/TailFile.h | 2 + .../tests/unit/TailFileTests.cpp | 179 ++++++++++++++++++++- 3 files changed, 227 insertions(+), 88 deletions(-) diff --git a/extensions/standard-processors/processors/TailFile.cpp b/extensions/standard-processors/processors/TailFile.cpp index 71314b5..8302aeb 100644 --- a/extensions/standard-processors/processors/TailFile.cpp +++ b/extensions/standard-processors/processors/TailFile.cpp @@ -36,6 +36,7 @@ #include <sstream> #include <string> #include <iostream> +#include "utils/file/FileUtils.h" #include "utils/TimeUtil.h" #include "utils/StringUtils.h" #include "TailFile.h" @@ -164,90 +165,57 @@ void TailFile::storeState() { static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem j) { return (i.modifiedTime < j.modifiedTime); } -void TailFile::checkRollOver(const std::string &fileLocation, const std::string &fileName) { +void TailFile::checkRollOver(const std::string &fileLocation, const std::string &baseFileName) { struct stat statbuf; std::vector<TailMatchedFileItem> matchedFiles; std::string fullPath = fileLocation + "/" + _currentTailFileName; if (stat(fullPath.c_str(), &statbuf) == 0) { - if (statbuf.st_size > this->_currentTailFilePosition) - // there are new input for the current tail file - return; - - uint64_t modifiedTimeCurrentTailFile = ((uint64_t) (statbuf.st_mtime) * 1000); - std::string pattern = fileName; - std::size_t found = fileName.find_last_of("."); + logger_->log_trace("Searching for files rolled over"); + std::string pattern = baseFileName; + std::size_t found = baseFileName.find_last_of("."); if (found != std::string::npos) - pattern = fileName.substr(0, found); -#ifndef WIN32 - DIR *d; - d = opendir(fileLocation.c_str()); - if (!d) - return; - while (1) { - struct dirent *entry; - entry = readdir(d); - if (!entry) - break; - std::string d_name = entry->d_name; - if (!(entry->d_type & DT_DIR)) { - std::string fileName = d_name; - std::string fileFullName = fileLocation + "/" + d_name; - if (fileFullName.find(pattern) != std::string::npos && stat(fileFullName.c_str(), &statbuf) == 0) { - if (((uint64_t) (statbuf.st_mtime) * 1000) >= modifiedTimeCurrentTailFile) { - TailMatchedFileItem item; - item.fileName = fileName; - item.modifiedTime = ((uint64_t) (statbuf.st_mtime) * 1000); - matchedFiles.push_back(item); + pattern = baseFileName.substr(0, found); + + // Callback, called for each file entry in the listed directory + // Return value is used to break (false) or continue (true) listing + auto lambda = [&](const std::string& path, const std::string& filename) -> bool { + struct stat sb; + std::string fileFullName = path + utils::file::FileUtils::get_separator() + filename; + if ((fileFullName.find(pattern) != std::string::npos) && stat(fileFullName.c_str(), &sb) == 0) { + uint64_t candidateModTime = ((uint64_t) (sb.st_mtime) * 1000); + if (candidateModTime >= _currentTailFileModificationTime) { + if (filename == _currentTailFileName && candidateModTime == _currentTailFileModificationTime && + sb.st_size == _currentTailFilePosition) { + return true; // Skip the current file as a candidate in case it wasn't updated } + TailMatchedFileItem item; + item.fileName = filename; + item.modifiedTime = ((uint64_t) (sb.st_mtime) * 1000); + matchedFiles.push_back(item); } } - } - closedir(d); -#else - - HANDLE hFind; - WIN32_FIND_DATA FindFileData; - - if ((hFind = FindFirstFile(fileLocation.c_str(), &FindFileData)) != INVALID_HANDLE_VALUE) { - do { - struct stat statbuf {}; - if (stat(FindFileData.cFileName, &statbuf) != 0) { - logger_->log_warn("Failed to stat %s", FindFileData.cFileName); - break; - } + return true; + }; - std::string fileFullName = fileLocation + "/" + FindFileData.cFileName; + utils::file::FileUtils::list_dir(fileLocation, lambda, logger_, false); - if (fileFullName.find(pattern) != std::string::npos && stat(fileFullName.c_str(), &statbuf) == 0) { - if (((uint64_t)(statbuf.st_mtime) * 1000) >= modifiedTimeCurrentTailFile) { - TailMatchedFileItem item; - item.fileName = fileName; - item.modifiedTime = ((uint64_t)(statbuf.st_mtime) * 1000); - matchedFiles.push_back(item); - } - } - }while (FindNextFile(hFind, &FindFileData)); - FindClose(hFind); + if (matchedFiles.size() < 1) { + logger_->log_debug("No newer files found in directory!"); + return; } -#endif // Sort the list based on modified time std::sort(matchedFiles.begin(), matchedFiles.end(), sortTailMatchedFileItem); - for (std::vector<TailMatchedFileItem>::iterator it = matchedFiles.begin(); it != matchedFiles.end(); ++it) { - TailMatchedFileItem item = *it; - if (item.fileName == _currentTailFileName) { - ++it; - if (it != matchedFiles.end()) { - TailMatchedFileItem nextItem = *it; - logger_->log_info("TailFile File Roll Over from %s to %s", _currentTailFileName, nextItem.fileName); - _currentTailFileName = nextItem.fileName; - _currentTailFilePosition = 0; - storeState(); - } - break; - } + TailMatchedFileItem item = matchedFiles[0]; + logger_->log_info("TailFile File Roll Over from %s to %s", _currentTailFileName, item.fileName); + + // Going ahead in the file rolled over + if (_currentTailFileName != baseFileName) { + _currentTailFilePosition = 0; } + _currentTailFileName = item.fileName; + storeState(); } else { return; } @@ -259,7 +227,7 @@ void TailFile::onTrigger(core::ProcessContext *context, core::ProcessSession *se std::string fileLocation = ""; std::string fileName = ""; if (context->getProperty(FileName.getName(), value)) { - std::size_t found = value.find_last_of("/\\"); + std::size_t found = value.find_last_of(utils::file::FileUtils::get_separator()); fileLocation = value.substr(0, found); fileName = value.substr(found + 1); } @@ -280,7 +248,8 @@ void TailFile::onTrigger(core::ProcessContext *context, core::ProcessSession *se logger_->log_debug("Tailing file %s", fullPath); if (stat(fullPath.c_str(), &statbuf) == 0) { if ((uint64_t) statbuf.st_size <= this->_currentTailFilePosition) { - // there are no new input for the current tail file + logger_->log_trace("Current pos: %llu", this->_currentTailFilePosition); + logger_->log_trace("%s", "there are no new input for the current tail file"); context->yield(); return; } @@ -329,19 +298,20 @@ void TailFile::onTrigger(core::ProcessContext *context, core::ProcessSession *se } else { std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); - if (!flowFile) - return; - flowFile->updateKeyedAttribute(PATH, fileLocation); - flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath); - session->import(fullPath, flowFile, true, this->_currentTailFilePosition); - session->transfer(flowFile, Success); - logger_->log_info("TailFile %s for %llu bytes", _currentTailFileName, flowFile->getSize()); - std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" + std::to_string(_currentTailFilePosition + flowFile->getSize()) + "." + extension; - flowFile->updateKeyedAttribute(FILENAME, logName); - this->_currentTailFilePosition += flowFile->getSize(); - storeState(); + if (flowFile) { + flowFile->updateKeyedAttribute(PATH, fileLocation); + flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath); + session->import(fullPath, flowFile, true, this->_currentTailFilePosition); + session->transfer(flowFile, Success); + logger_->log_info("TailFile %s for %llu bytes", _currentTailFileName, flowFile->getSize()); + std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" + + std::to_string(_currentTailFilePosition + flowFile->getSize()) + "." + extension; + flowFile->updateKeyedAttribute(FILENAME, logName); + this->_currentTailFilePosition += flowFile->getSize(); + storeState(); + } } - + _currentTailFileModificationTime = ((uint64_t) (statbuf.st_mtime) * 1000); } else { logger_->log_warn("Unable to stat file %s", fullPath); } diff --git a/extensions/standard-processors/processors/TailFile.h b/extensions/standard-processors/processors/TailFile.h index 3714023..54296a2 100644 --- a/extensions/standard-processors/processors/TailFile.h +++ b/extensions/standard-processors/processors/TailFile.h @@ -43,6 +43,7 @@ class TailFile : public core::Processor { explicit TailFile(std::string name, utils::Identifier uuid = utils::Identifier()) : core::Processor(name, uuid), _currentTailFilePosition(0), + _currentTailFileModificationTime(0), logger_(logging::LoggerFactory<TailFile>::getLogger()) { _stateRecovered = false; } @@ -89,6 +90,7 @@ class TailFile : public core::Processor { // determine if state is recovered; bool _stateRecovered; uint64_t _currentTailFilePosition; + uint64_t _currentTailFileModificationTime; static const int BUFFER_SIZE = 512; // Utils functions for parse state file diff --git a/extensions/standard-processors/tests/unit/TailFileTests.cpp b/extensions/standard-processors/tests/unit/TailFileTests.cpp index c5cf392..b7c96ec 100644 --- a/extensions/standard-processors/tests/unit/TailFileTests.cpp +++ b/extensions/standard-processors/tests/unit/TailFileTests.cpp @@ -16,6 +16,7 @@ * limitations under the License. */ +#include <stdio.h> #include <uuid/uuid.h> #include <fstream> #include <map> @@ -37,13 +38,13 @@ #include "LogAttribute.h" -static const char *NEWLINE_FILE = "" +static std::string NEWLINE_FILE = "" // NOLINT "one,two,three\n" "four,five,six, seven"; static const char *TMP_FILE = "/tmp/minifi-tmpfile.txt"; static const char *STATE_FILE = "/tmp/minifi-state-file.txt"; -TEST_CASE("TailFileWithDelimiter", "[tailfiletest1]") { +TEST_CASE("TailFileWithDelimiter", "[tailfiletest2]") { // Create and write to the test file std::ofstream tmpfile; tmpfile.open(TMP_FILE); @@ -76,8 +77,8 @@ TEST_CASE("TailFileWithDelimiter", "[tailfiletest1]") { LogTestController::getInstance().reset(); // Delete the test and state file. - std::remove(TMP_FILE); - std::remove(STATE_FILE); + remove(TMP_FILE); + remove(STATE_FILE); } TEST_CASE("TailFileWithOutDelimiter", "[tailfiletest2]") { @@ -113,9 +114,175 @@ TEST_CASE("TailFileWithOutDelimiter", "[tailfiletest2]") { LogTestController::getInstance().reset(); // Delete the test and state file. - std::remove(TMP_FILE); - std::remove(STATE_FILE); + remove(TMP_FILE); + remove(STATE_FILE); +} + +TEST_CASE("TailFileWithRealDelimiterAndRotate", "[tailfiletest2]") { + TestController testController; + + const char DELIM = ','; + size_t expected_pieces = std::count(NEWLINE_FILE.begin(), NEWLINE_FILE.end(), DELIM); // The last piece is left as considered unfinished + + + LogTestController::getInstance().setTrace<TestPlan>(); + LogTestController::getInstance().setTrace<processors::TailFile>(); + LogTestController::getInstance().setTrace<processors::LogAttribute>(); + LogTestController::getInstance().setTrace<core::ProcessSession>(); + + auto plan = testController.createPlan(); + + char format[] = "/tmp/gt.XXXXXX"; + char *dir = testController.createTempDirectory(format); + + // Define test input file + std::string in_file(dir); + in_file.append("/testfifo.txt"); + + std::string state_file(dir); + state_file.append("tailfile.state"); + + std::ofstream in_file_stream(in_file); + in_file_stream << NEWLINE_FILE; + in_file_stream.flush(); + + // Build MiNiFi processing graph + auto tail_file = plan->addProcessor( + "TailFile", + "Tail"); + plan->setProperty( + tail_file, + processors::TailFile::Delimiter.getName(), std::string(1, DELIM)); + plan->setProperty( + tail_file, + processors::TailFile::FileName.getName(), in_file); + plan->setProperty( + tail_file, + processors::TailFile::StateFile.getName(), state_file); + auto log_attr = plan->addProcessor( + "LogAttribute", + "Log", + core::Relationship("success", "description"), + true); + plan->setProperty( + log_attr, + processors::LogAttribute::FlowFilesToLog.getName(), "0"); + // Log as many FFs as it can to make sure exactly the expected amount is produced + + + plan->runNextProcessor(); // Tail + plan->runNextProcessor(); // Log + + REQUIRE(LogTestController::getInstance().contains(std::string("Logged ") + std::to_string(expected_pieces) + " flow files")); + + in_file_stream << DELIM; + in_file_stream.close(); + + + std::string rotated_file = (in_file + ".1"); + + REQUIRE(rename(in_file.c_str(), rotated_file.c_str() ) == 0); + + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // make sure the new file gets newer modification time + + std::ofstream new_in_file_stream(in_file); + new_in_file_stream << "five" << DELIM << "six" << DELIM; + new_in_file_stream.close(); + + plan->reset(); + plan->runNextProcessor(); // Tail + plan->runNextProcessor(); // Log + + // Find the last flow file in the rotated file + REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files")); + + plan->reset(); + plan->runNextProcessor(); // Tail + plan->runNextProcessor(); // Log + + // Two new files in the new flow file + REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files")); } + +TEST_CASE("TailFileWithMultileRolledOverFiles", "[tailfiletest2]") { + TestController testController; + + const char DELIM = ':'; + + LogTestController::getInstance().setTrace<TestPlan>(); + LogTestController::getInstance().setTrace<processors::TailFile>(); + LogTestController::getInstance().setTrace<processors::LogAttribute>(); + LogTestController::getInstance().setTrace<core::ProcessSession>(); + + auto plan = testController.createPlan(); + + char format[] = "/tmp/gt.XXXXXX"; + char *dir = testController.createTempDirectory(format); + + std::string state_file(dir); + state_file.append("tailfile.state"); + + // Define test input file + std::string in_file(dir); + in_file.append("/fruits.txt"); + + for (int i = 2; 0 <= i; --i) { + if (i < 2) { + std::this_thread::sleep_for( + std::chrono::milliseconds(1000)); // make sure the new file gets newer modification time + } + std::ofstream in_file_stream(in_file + (i > 0 ? std::to_string(i) : "")); + for (int j = 0; j <= i; j++) { + in_file_stream << "Apple" << DELIM; + } + in_file_stream.close(); + } + + // Build MiNiFi processing graph + auto tail_file = plan->addProcessor( + "TailFile", + "Tail"); + plan->setProperty( + tail_file, + processors::TailFile::Delimiter.getName(), std::string(1, DELIM)); + plan->setProperty( + tail_file, + processors::TailFile::FileName.getName(), in_file); + plan->setProperty( + tail_file, + processors::TailFile::StateFile.getName(), state_file); + auto log_attr = plan->addProcessor( + "LogAttribute", + "Log", + core::Relationship("success", "description"), + true); + plan->setProperty( + log_attr, + processors::LogAttribute::FlowFilesToLog.getName(), "0"); + // Log as many FFs as it can to make sure exactly the expected amount is produced + + + // Each iteration should go through one file and log all flowfiles + for (int i = 2; 0 <= i; --i) { + plan->reset(); + plan->runNextProcessor(); // Tail + plan->runNextProcessor(); // Log + + REQUIRE(LogTestController::getInstance().contains(std::string("Logged ") + std::to_string(i + 1) + " flow files")); + } + + // Rrite some more data to the source file + std::ofstream in_file_stream(in_file); + in_file_stream << "Pear" << DELIM << "Cherry" << DELIM; + + plan->reset(); + plan->runNextProcessor(); // Tail + plan->runNextProcessor(); // Log + + REQUIRE(LogTestController::getInstance().contains(std::string("Logged 2 flow files"))); +} + + /* TEST_CASE("TailFileWithDelimiter", "[tailfiletest1]") { try {
