szaszm commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r436597137



##########
File path: extensions/standard-processors/processors/TailFile.cpp
##########
@@ -403,102 +435,136 @@ void TailFile::onTrigger(const 
std::shared_ptr<core::ProcessContext> &context, c
     state_file_ = st_file + "." + getUUIDStr();
   }
   if (!this->state_recovered_) {
-    state_recovered_ = true;
-    // recover the state if we have not done so
     this->recoverState(context);
+    state_recovered_ = true;
+  }
+
+  if (tail_mode_ == Mode::MULTIPLE) {
+    checkForRemovedFiles();
+    checkForNewFiles();
   }
 
-  /**
-   * iterate over file states. may modify them
-   */
+  // iterate over file states. may modify them
   for (auto &state : tail_states_) {
-    auto fileLocation = state.second.path_;
-
-    checkRollOver(context, state.second, state.first);
-    std::string fullPath = fileLocation + 
utils::file::FileUtils::get_separator() + state.second.current_file_name_;
-    struct stat statbuf;
-
-    logger_->log_debug("Tailing file %s from %llu", fullPath, 
state.second.currentTailFilePosition_);
-    if (stat(fullPath.c_str(), &statbuf) == 0) {
-      if ((uint64_t) statbuf.st_size <= state.second.currentTailFilePosition_) 
{
-        logger_->log_trace("Current pos: %llu", 
state.second.currentTailFilePosition_);
-        logger_->log_trace("%s", "there are no new input for the current tail 
file");
-        context->yield();
-        return;
-      }
-      std::size_t found = state.first.find_last_of(".");
-      std::string baseName = state.first.substr(0, found);
-      std::string extension = state.first.substr(found + 1);
-
-      if (!delimiter_.empty()) {
-        char delim = delimiter_.c_str()[0];
-        if (delim == '\\') {
-          if (delimiter_.size() > 1) {
-            switch (delimiter_.c_str()[1]) {
-              case 'r':
-                delim = '\r';
-                break;
-              case 't':
-                delim = '\t';
-                break;
-              case 'n':
-                delim = '\n';
-                break;
-              case '\\':
-                delim = '\\';
-                break;
-              default:
-                // previous behavior
-                break;
-            }
-          }
-        }
-        logger_->log_debug("Looking for delimiter 0x%X", delim);
-        std::vector<std::shared_ptr<FlowFileRecord>> flowFiles;
-        session->import(fullPath, flowFiles, 
state.second.currentTailFilePosition_, delim);
-        logger_->log_info("%u flowfiles were received from TailFile input", 
flowFiles.size());
-
-        for (auto ffr : flowFiles) {
-          logger_->log_info("TailFile %s for %u bytes", state.first, 
ffr->getSize());
-          std::string logName = baseName + "." + 
std::to_string(state.second.currentTailFilePosition_) + "-" + 
std::to_string(state.second.currentTailFilePosition_ + ffr->getSize()) + "." + 
extension;
-          ffr->updateKeyedAttribute(PATH, fileLocation);
-          ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-          ffr->updateKeyedAttribute(FILENAME, logName);
-          session->transfer(ffr, Success);
-          state.second.currentTailFilePosition_ += ffr->getSize() + 1;
-          storeState(context);
-        }
+    processFile(context, session, state.first, state.second);
+  }
 
-      } else {
-        std::shared_ptr<FlowFileRecord> flowFile = 
std::static_pointer_cast<FlowFileRecord>(session->create());
-        if (flowFile) {
-          flowFile->updateKeyedAttribute(PATH, fileLocation);
-          flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-          session->import(fullPath, flowFile, true, 
state.second.currentTailFilePosition_);
-          session->transfer(flowFile, Success);
-          logger_->log_info("TailFile %s for %llu bytes", state.first, 
flowFile->getSize());
-          std::string logName = baseName + "." + 
std::to_string(state.second.currentTailFilePosition_) + "-" + 
std::to_string(state.second.currentTailFilePosition_ + flowFile->getSize()) + 
"."
-              + extension;
-          flowFile->updateKeyedAttribute(FILENAME, logName);
-          state.second.currentTailFilePosition_ += flowFile->getSize();
-          storeState(context);
-        }
-      }
-      state.second.currentTailFileModificationTime_ = ((uint64_t) 
(statbuf.st_mtime) * 1000);
-    } else {
-      logger_->log_warn("Unable to stat file %s", fullPath);
+  if (!session->existsFlowFileInRelationship(Success)) {
+    yield();
+  }
+}
+
+void TailFile::processFile(const std::shared_ptr<core::ProcessContext> 
&context,
+                           const std::shared_ptr<core::ProcessSession> 
&session,
+                           const std::string &fileName,
+                           TailState &state) {
+  if (utils::file::FileUtils::file_size(state.fileNameWithPath()) < 
state.position_) {
+    processRotatedFiles(context, session, state);
+  }
+
+  processSingleFile(context, session, fileName, state);
+}
+
+void TailFile::processRotatedFiles(const std::shared_ptr<core::ProcessContext> 
&context,
+                                   const std::shared_ptr<core::ProcessSession> 
&session,
+                                   TailState &state) {
+    std::vector<TailState> rotated_file_states = findRotatedFiles(state);
+    for (TailState &file_state : rotated_file_states) {
+      processSingleFile(context, session, file_state.file_name_, file_state);
     }
+    state.position_ = 0;
+    state.checksum_ = 0;
+}
+
+void TailFile::processSingleFile(const std::shared_ptr<core::ProcessContext> 
&context,
+                                 const std::shared_ptr<core::ProcessSession> 
&session,
+                                 const std::string &fileName,
+                                 TailState &state) {
+  std::string full_file_name = state.fileNameWithPath();
+
+  if (utils::file::FileUtils::file_size(full_file_name) == 0u) {
+    logger_->log_warn("Unable to read file %s as it does not exist or has size 
zero", full_file_name);
+    return;
+  }
+  logger_->log_debug("Tailing file %s from %llu", full_file_name, 
state.position_);
+
+  std::size_t last_dot_position = fileName.find_last_of('.');
+  std::string baseName = fileName.substr(0, last_dot_position);
+  std::string extension = fileName.substr(last_dot_position + 1);
+
+  if (!delimiter_.empty()) {
+    char delim = delimiter_[0];
+    logger_->log_trace("Looking for delimiter 0x%X", delim);
+
+    std::vector<std::shared_ptr<FlowFileRecord>> flowFiles;
+    uint64_t checksum = state.checksum_;
+    session->import(full_file_name, flowFiles, state.position_, delim, true, 
&checksum);
+    logger_->log_info("%u flowfiles were received from TailFile input", 
flowFiles.size());
+
+    for (auto &ffr : flowFiles) {

Review comment:
       The difference is clear to me, but I can only speak for myself. I'm OK 
with leaving it as is.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to