szaszm commented on a change in pull request #1262:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1262#discussion_r835754676



##########
File path: extensions/standard-processors/processors/FetchFile.cpp
##########
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "FetchFile.h"
+
+#include <errno.h>
+#include <filesystem>
+#include <utility>
+
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/FileReaderCallback.h"
+#include "utils/file/FileUtils.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property FetchFile::FileToFetch(
+    core::PropertyBuilder::createProperty("File to Fetch")
+      ->withDescription("The fully-qualified filename of the file to fetch 
from the file system. If not defined the default ${absolute.path}/${filename} 
path is used.")
+      ->supportsExpressionLanguage(true)
+      ->build());
+
+const core::Property FetchFile::CompletionStrategy(
+    core::PropertyBuilder::createProperty("Completion Strategy")
+      ->withDescription("Specifies what to do with the original file on the 
file system once it has been pulled into MiNiFi")
+      ->withDefaultValue<std::string>(toString(CompletionStrategyOption::NONE))
+      ->withAllowableValues<std::string>(CompletionStrategyOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::MoveDestinationDirectory(
+    core::PropertyBuilder::createProperty("Move Destination Directory")
+      ->withDescription("The directory to move the original file to once it 
has been fetched from the file system. "
+                        "This property is ignored unless the Completion 
Strategy is set to \"Move File\". If the directory does not exist, it will be 
created.")
+      ->supportsExpressionLanguage(true)
+      ->build());
+
+const core::Property FetchFile::MoveConflictStrategy(
+    core::PropertyBuilder::createProperty("Move Conflict Strategy")
+      ->withDescription("If Completion Strategy is set to Move File and a file 
already exists in the destination directory with the same name, "
+                        "this property specifies how that naming conflict 
should be resolved")
+      
->withDefaultValue<std::string>(toString(MoveConflictStrategyOption::RENAME))
+      ->withAllowableValues<std::string>(MoveConflictStrategyOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::LogLevelWhenFileNotFound(
+    core::PropertyBuilder::createProperty("Log level when file not found")
+      ->withDescription("Log level to use in case the file does not exist when 
the processor is triggered")
+      ->withDefaultValue<std::string>(toString(LogLevelOption::LOGGING_ERROR))
+      ->withAllowableValues<std::string>(LogLevelOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::LogLevelWhenPermissionDenied(
+    core::PropertyBuilder::createProperty("Log level when permission denied")
+      ->withDescription("Log level to use in case agent does not have 
sufficient permissions to read the file")
+      ->withDefaultValue<std::string>(toString(LogLevelOption::LOGGING_ERROR))
+      ->withAllowableValues<std::string>(LogLevelOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Relationship FetchFile::Success("success", "Any FlowFile that is 
successfully fetched from the file system will be transferred to this 
Relationship.");
+const core::Relationship FetchFile::NotFound(
+  "not.found",
+  "Any FlowFile that could not be fetched from the file system because the 
file could not be found will be transferred to this Relationship.");
+const core::Relationship FetchFile::PermissionDenied(
+  "permission.denied",
+  "Any FlowFile that could not be fetched from the file system due to the user 
running MiNiFi not having sufficient permissions will be transferred to this 
Relationship.");
+const core::Relationship FetchFile::Failure(
+  "failure",
+  "Any FlowFile that could not be fetched from the file system for any reason 
other than insufficient permissions or the file not existing will be 
transferred to this Relationship.");
+
+void FetchFile::initialize() {
+  setSupportedProperties({
+    FileToFetch,
+    CompletionStrategy,
+    MoveDestinationDirectory,
+    MoveConflictStrategy,
+    LogLevelWhenFileNotFound,
+    LogLevelWhenPermissionDenied
+  });
+
+  setSupportedRelationships({
+    Success,
+    NotFound,
+    PermissionDenied,
+    Failure
+  });
+}
+
+void FetchFile::onSchedule(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSessionFactory> 
&/*sessionFactory*/) {
+  gsl_Expects(context);
+  completion_strategy_ = 
utils::parseEnumProperty<CompletionStrategyOption>(*context, 
CompletionStrategy);
+  std::string move_destination_dir;
+  context->getProperty(MoveDestinationDirectory.getName(), 
move_destination_dir);
+  if (completion_strategy_ == CompletionStrategyOption::MOVE_FILE && 
move_destination_dir.empty()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Move Destination Directory is 
required when Completion Strategy is set to Move File");
+  }
+  move_confict_strategy_ = 
utils::parseEnumProperty<MoveConflictStrategyOption>(*context, 
MoveConflictStrategy);
+  log_level_when_file_not_found_ = 
utils::parseEnumProperty<LogLevelOption>(*context, LogLevelWhenFileNotFound);
+  log_level_when_permission_denied_ = 
utils::parseEnumProperty<LogLevelOption>(*context, 
LogLevelWhenPermissionDenied);
+}
+
+std::string FetchFile::getFileToFetch(core::ProcessContext& context, const 
std::shared_ptr<core::FlowFile>& flow_file) const {
+  std::string file_to_fetch_path;
+  context.getProperty(FileToFetch, file_to_fetch_path, flow_file);
+  if (!file_to_fetch_path.empty()) {
+    return file_to_fetch_path;
+  }
+
+  flow_file->getAttribute("absolute.path", file_to_fetch_path);
+  std::string filename;
+  flow_file->getAttribute("filename", filename);
+  file_to_fetch_path += utils::file::FileUtils::get_separator() + filename;
+  return file_to_fetch_path;
+}
+
+void FetchFile::logWithLevel(LogLevelOption log_level, const std::string& 
message) const {

Review comment:
       Consider making this a local template that forwards its arguments to the 
appropriate logging function, and use the formatting capabilities of the 
logger. This way, we avoid allocations of temporaries and use `%s` to 
interpolate the filenames.
   ```suggestion
   template<typename... Args>
   static void logWithLevel(LogLevelOption log_level, Args&&... args) const {
   ```
   and
   ```
         logger_->log_trace(std::forward<Args>(args)...);
   ```

##########
File path: extensions/standard-processors/processors/FetchFile.cpp
##########
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "FetchFile.h"
+
+#include <errno.h>
+#include <filesystem>
+#include <utility>
+
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/FileReaderCallback.h"
+#include "utils/file/FileUtils.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property FetchFile::FileToFetch(
+    core::PropertyBuilder::createProperty("File to Fetch")
+      ->withDescription("The fully-qualified filename of the file to fetch 
from the file system. If not defined the default ${absolute.path}/${filename} 
path is used.")
+      ->supportsExpressionLanguage(true)
+      ->build());
+
+const core::Property FetchFile::CompletionStrategy(
+    core::PropertyBuilder::createProperty("Completion Strategy")
+      ->withDescription("Specifies what to do with the original file on the 
file system once it has been pulled into MiNiFi")
+      ->withDefaultValue<std::string>(toString(CompletionStrategyOption::NONE))
+      ->withAllowableValues<std::string>(CompletionStrategyOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::MoveDestinationDirectory(
+    core::PropertyBuilder::createProperty("Move Destination Directory")
+      ->withDescription("The directory to move the original file to once it 
has been fetched from the file system. "
+                        "This property is ignored unless the Completion 
Strategy is set to \"Move File\". If the directory does not exist, it will be 
created.")
+      ->supportsExpressionLanguage(true)
+      ->build());
+
+const core::Property FetchFile::MoveConflictStrategy(
+    core::PropertyBuilder::createProperty("Move Conflict Strategy")
+      ->withDescription("If Completion Strategy is set to Move File and a file 
already exists in the destination directory with the same name, "
+                        "this property specifies how that naming conflict 
should be resolved")
+      
->withDefaultValue<std::string>(toString(MoveConflictStrategyOption::RENAME))
+      ->withAllowableValues<std::string>(MoveConflictStrategyOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::LogLevelWhenFileNotFound(
+    core::PropertyBuilder::createProperty("Log level when file not found")
+      ->withDescription("Log level to use in case the file does not exist when 
the processor is triggered")
+      ->withDefaultValue<std::string>(toString(LogLevelOption::LOGGING_ERROR))
+      ->withAllowableValues<std::string>(LogLevelOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::LogLevelWhenPermissionDenied(
+    core::PropertyBuilder::createProperty("Log level when permission denied")
+      ->withDescription("Log level to use in case agent does not have 
sufficient permissions to read the file")
+      ->withDefaultValue<std::string>(toString(LogLevelOption::LOGGING_ERROR))
+      ->withAllowableValues<std::string>(LogLevelOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Relationship FetchFile::Success("success", "Any FlowFile that is 
successfully fetched from the file system will be transferred to this 
Relationship.");
+const core::Relationship FetchFile::NotFound(
+  "not.found",
+  "Any FlowFile that could not be fetched from the file system because the 
file could not be found will be transferred to this Relationship.");
+const core::Relationship FetchFile::PermissionDenied(
+  "permission.denied",
+  "Any FlowFile that could not be fetched from the file system due to the user 
running MiNiFi not having sufficient permissions will be transferred to this 
Relationship.");
+const core::Relationship FetchFile::Failure(
+  "failure",
+  "Any FlowFile that could not be fetched from the file system for any reason 
other than insufficient permissions or the file not existing will be 
transferred to this Relationship.");
+
+void FetchFile::initialize() {
+  setSupportedProperties({
+    FileToFetch,
+    CompletionStrategy,
+    MoveDestinationDirectory,
+    MoveConflictStrategy,
+    LogLevelWhenFileNotFound,
+    LogLevelWhenPermissionDenied
+  });
+
+  setSupportedRelationships({
+    Success,
+    NotFound,
+    PermissionDenied,
+    Failure
+  });
+}
+
+void FetchFile::onSchedule(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSessionFactory> 
&/*sessionFactory*/) {
+  gsl_Expects(context);
+  completion_strategy_ = 
utils::parseEnumProperty<CompletionStrategyOption>(*context, 
CompletionStrategy);
+  std::string move_destination_dir;
+  context->getProperty(MoveDestinationDirectory.getName(), 
move_destination_dir);
+  if (completion_strategy_ == CompletionStrategyOption::MOVE_FILE && 
move_destination_dir.empty()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Move Destination Directory is 
required when Completion Strategy is set to Move File");
+  }
+  move_confict_strategy_ = 
utils::parseEnumProperty<MoveConflictStrategyOption>(*context, 
MoveConflictStrategy);
+  log_level_when_file_not_found_ = 
utils::parseEnumProperty<LogLevelOption>(*context, LogLevelWhenFileNotFound);
+  log_level_when_permission_denied_ = 
utils::parseEnumProperty<LogLevelOption>(*context, 
LogLevelWhenPermissionDenied);
+}
+
+std::string FetchFile::getFileToFetch(core::ProcessContext& context, const 
std::shared_ptr<core::FlowFile>& flow_file) const {
+  std::string file_to_fetch_path;
+  context.getProperty(FileToFetch, file_to_fetch_path, flow_file);
+  if (!file_to_fetch_path.empty()) {
+    return file_to_fetch_path;
+  }
+
+  flow_file->getAttribute("absolute.path", file_to_fetch_path);
+  std::string filename;
+  flow_file->getAttribute("filename", filename);
+  file_to_fetch_path += utils::file::FileUtils::get_separator() + filename;
+  return file_to_fetch_path;

Review comment:
       Consider using `std::filesystem::path` for stronger typing.
   ```suggestion
     flow_file->getAttribute("absolute.path", file_to_fetch_path);
     std::string filename;
     flow_file->getAttribute("filename", filename);
     return std::filesystem::path{file_to_fetch_path} / filename;
   ```

##########
File path: extensions/standard-processors/tests/unit/FetchFileTests.cpp
##########
@@ -0,0 +1,348 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <memory>
+#include <string>
+#include <unordered_set>
+
+#include "TestBase.h"
+#include "Catch.h"
+#include "core/Property.h"
+#include "core/Processor.h"
+#include "processors/GenerateFlowFile.h"
+#include "processors/FetchFile.h"
+#include "processors/PutFile.h"
+#include "utils/TestUtils.h"
+#include "utils/IntegrationTestUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace {
+
+class FetchFileTestFixture {
+ public:
+  FetchFileTestFixture();
+  ~FetchFileTestFixture();
+  std::unordered_multiset<std::string> getSuccessfulFlowFileContents() const;
+  std::unordered_multiset<std::string> getFailedFlowFileContents() const;
+  std::unordered_multiset<std::string> getNotFoundFlowFileContents() const;
+#ifndef WIN32
+  std::unordered_multiset<std::string> getPermissionDeniedFlowFileContents() 
const;
+#endif
+
+ protected:
+  std::unordered_multiset<std::string> getDirContents(const std::string& 
dir_path) const;
+
+  TestController test_controller_;
+  std::shared_ptr<TestPlan> plan_;
+  const std::string input_dir_;
+  const std::string success_output_dir_;
+  const std::string failure_output_dir_;
+  const std::string not_found_output_dir_;
+  const std::string permission_denied_output_dir_;
+  const std::string permission_denied_file_name_;
+  const std::string input_file_name_;
+  const std::string file_content_;
+  std::shared_ptr<core::Processor> fetch_file_processor_;
+  std::shared_ptr<core::Processor> update_attribute_processor_;
+};
+
+FetchFileTestFixture::FetchFileTestFixture()
+  : plan_(test_controller_.createPlan()),
+    input_dir_(test_controller_.createTempDirectory()),
+    success_output_dir_(test_controller_.createTempDirectory()),
+    failure_output_dir_(test_controller_.createTempDirectory()),
+    not_found_output_dir_(test_controller_.createTempDirectory()),
+    permission_denied_output_dir_(test_controller_.createTempDirectory()),
+    permission_denied_file_name_("permission_denied.txt"),
+    input_file_name_("test.txt"),
+    file_content_("The quick brown fox jumps over the lazy dog\n")  {
+  LogTestController::getInstance().setTrace<TestPlan>();
+  LogTestController::getInstance().setTrace<minifi::processors::FetchFile>();
+  
LogTestController::getInstance().setTrace<minifi::processors::GenerateFlowFile>();
+
+  REQUIRE(!input_dir_.empty());
+  REQUIRE(!success_output_dir_.empty());
+  REQUIRE(!failure_output_dir_.empty());
+  REQUIRE(!not_found_output_dir_.empty());
+  REQUIRE(!permission_denied_output_dir_.empty());
+
+  auto generate_flow_file_processor = plan_->addProcessor("GenerateFlowFile", 
"GenerateFlowFile");
+  plan_->setProperty(generate_flow_file_processor, 
org::apache::nifi::minifi::processors::GenerateFlowFile::FileSize.getName(), 
"0B");
+  update_attribute_processor_ = plan_->addProcessor("UpdateAttribute", 
"UpdateAttribute", core::Relationship("success", "description"), true);
+  plan_->setProperty(update_attribute_processor_, "absolute.path", input_dir_, 
true);
+  plan_->setProperty(update_attribute_processor_, "filename", 
input_file_name_, true);
+
+  fetch_file_processor_ = plan_->addProcessor("FetchFile", "FetchFile", 
core::Relationship("success", "description"), true);
+
+  auto success_putfile = plan_->addProcessor("PutFile", "SuccessPutFile", { 
{"success", "d"} }, false);
+  plan_->addConnection(fetch_file_processor_, {"success", "d"}, 
success_putfile);
+  success_putfile->setAutoTerminatedRelationships({{"success", "d"}, 
{"failure", "d"}});
+  plan_->setProperty(success_putfile, 
org::apache::nifi::minifi::processors::PutFile::Directory.getName(), 
success_output_dir_);
+
+  auto failure_putfile = plan_->addProcessor("PutFile", "FailurePutFile", { 
{"success", "d"} }, false);
+  plan_->addConnection(fetch_file_processor_, {"failure", "d"}, 
failure_putfile);
+  failure_putfile->setAutoTerminatedRelationships({{"success", "d"}, 
{"failure", "d"}});
+  plan_->setProperty(failure_putfile, 
org::apache::nifi::minifi::processors::PutFile::Directory.getName(), 
failure_output_dir_);
+
+  auto not_found_putfile = plan_->addProcessor("PutFile", "NotFoundPutFile", { 
{"success", "d"} }, false);
+  plan_->addConnection(fetch_file_processor_, {"not.found", "d"}, 
not_found_putfile);
+  not_found_putfile->setAutoTerminatedRelationships({{"success", "d"}, 
{"not.found", "d"}});
+  plan_->setProperty(not_found_putfile, 
org::apache::nifi::minifi::processors::PutFile::Directory.getName(), 
not_found_output_dir_);
+
+  auto permission_denied_putfile = plan_->addProcessor("PutFile", 
"PermissionDeniedPutFile", { {"success", "d"} }, false);
+  plan_->addConnection(fetch_file_processor_, {"permission.denied", "d"}, 
permission_denied_putfile);
+  not_found_putfile->setAutoTerminatedRelationships({{"success", "d"}, 
{"permission.denied", "d"}});
+  plan_->setProperty(permission_denied_putfile, 
org::apache::nifi::minifi::processors::PutFile::Directory.getName(), 
permission_denied_output_dir_);
+
+  utils::putFileToDir(input_dir_, input_file_name_, file_content_);
+  utils::putFileToDir(input_dir_, permission_denied_file_name_, file_content_);
+#ifndef WIN32
+  utils::file::FileUtils::set_permissions(input_dir_ + 
utils::file::FileUtils::get_separator() + permission_denied_file_name_, 0);
+#endif
+}
+
+FetchFileTestFixture::~FetchFileTestFixture() {
+#ifndef WIN32
+  utils::file::FileUtils::set_permissions(input_dir_ + 
utils::file::FileUtils::get_separator() + permission_denied_file_name_, 0644);
+#endif

Review comment:
       You can use 
[`std::filesystem::permissions`](https://en.cppreference.com/w/cpp/filesystem/permissions)
 for cross platform coverage.

##########
File path: extensions/standard-processors/tests/unit/FetchFileTests.cpp
##########
@@ -0,0 +1,348 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <memory>
+#include <string>
+#include <unordered_set>
+
+#include "TestBase.h"
+#include "Catch.h"
+#include "core/Property.h"
+#include "core/Processor.h"
+#include "processors/GenerateFlowFile.h"
+#include "processors/FetchFile.h"
+#include "processors/PutFile.h"
+#include "utils/TestUtils.h"
+#include "utils/IntegrationTestUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace {
+
+class FetchFileTestFixture {
+ public:
+  FetchFileTestFixture();
+  ~FetchFileTestFixture();
+  std::unordered_multiset<std::string> getSuccessfulFlowFileContents() const;
+  std::unordered_multiset<std::string> getFailedFlowFileContents() const;
+  std::unordered_multiset<std::string> getNotFoundFlowFileContents() const;
+#ifndef WIN32
+  std::unordered_multiset<std::string> getPermissionDeniedFlowFileContents() 
const;
+#endif
+
+ protected:
+  std::unordered_multiset<std::string> getDirContents(const std::string& 
dir_path) const;
+
+  TestController test_controller_;
+  std::shared_ptr<TestPlan> plan_;
+  const std::string input_dir_;
+  const std::string success_output_dir_;
+  const std::string failure_output_dir_;
+  const std::string not_found_output_dir_;
+  const std::string permission_denied_output_dir_;
+  const std::string permission_denied_file_name_;
+  const std::string input_file_name_;
+  const std::string file_content_;
+  std::shared_ptr<core::Processor> fetch_file_processor_;
+  std::shared_ptr<core::Processor> update_attribute_processor_;
+};
+
+FetchFileTestFixture::FetchFileTestFixture()
+  : plan_(test_controller_.createPlan()),
+    input_dir_(test_controller_.createTempDirectory()),
+    success_output_dir_(test_controller_.createTempDirectory()),
+    failure_output_dir_(test_controller_.createTempDirectory()),
+    not_found_output_dir_(test_controller_.createTempDirectory()),
+    permission_denied_output_dir_(test_controller_.createTempDirectory()),
+    permission_denied_file_name_("permission_denied.txt"),
+    input_file_name_("test.txt"),
+    file_content_("The quick brown fox jumps over the lazy dog\n")  {
+  LogTestController::getInstance().setTrace<TestPlan>();
+  LogTestController::getInstance().setTrace<minifi::processors::FetchFile>();
+  
LogTestController::getInstance().setTrace<minifi::processors::GenerateFlowFile>();
+
+  REQUIRE(!input_dir_.empty());
+  REQUIRE(!success_output_dir_.empty());
+  REQUIRE(!failure_output_dir_.empty());
+  REQUIRE(!not_found_output_dir_.empty());
+  REQUIRE(!permission_denied_output_dir_.empty());
+
+  auto generate_flow_file_processor = plan_->addProcessor("GenerateFlowFile", 
"GenerateFlowFile");
+  plan_->setProperty(generate_flow_file_processor, 
org::apache::nifi::minifi::processors::GenerateFlowFile::FileSize.getName(), 
"0B");
+  update_attribute_processor_ = plan_->addProcessor("UpdateAttribute", 
"UpdateAttribute", core::Relationship("success", "description"), true);
+  plan_->setProperty(update_attribute_processor_, "absolute.path", input_dir_, 
true);
+  plan_->setProperty(update_attribute_processor_, "filename", 
input_file_name_, true);
+
+  fetch_file_processor_ = plan_->addProcessor("FetchFile", "FetchFile", 
core::Relationship("success", "description"), true);
+
+  auto success_putfile = plan_->addProcessor("PutFile", "SuccessPutFile", { 
{"success", "d"} }, false);
+  plan_->addConnection(fetch_file_processor_, {"success", "d"}, 
success_putfile);
+  success_putfile->setAutoTerminatedRelationships({{"success", "d"}, 
{"failure", "d"}});
+  plan_->setProperty(success_putfile, 
org::apache::nifi::minifi::processors::PutFile::Directory.getName(), 
success_output_dir_);
+
+  auto failure_putfile = plan_->addProcessor("PutFile", "FailurePutFile", { 
{"success", "d"} }, false);
+  plan_->addConnection(fetch_file_processor_, {"failure", "d"}, 
failure_putfile);
+  failure_putfile->setAutoTerminatedRelationships({{"success", "d"}, 
{"failure", "d"}});
+  plan_->setProperty(failure_putfile, 
org::apache::nifi::minifi::processors::PutFile::Directory.getName(), 
failure_output_dir_);
+
+  auto not_found_putfile = plan_->addProcessor("PutFile", "NotFoundPutFile", { 
{"success", "d"} }, false);
+  plan_->addConnection(fetch_file_processor_, {"not.found", "d"}, 
not_found_putfile);
+  not_found_putfile->setAutoTerminatedRelationships({{"success", "d"}, 
{"not.found", "d"}});
+  plan_->setProperty(not_found_putfile, 
org::apache::nifi::minifi::processors::PutFile::Directory.getName(), 
not_found_output_dir_);
+
+  auto permission_denied_putfile = plan_->addProcessor("PutFile", 
"PermissionDeniedPutFile", { {"success", "d"} }, false);
+  plan_->addConnection(fetch_file_processor_, {"permission.denied", "d"}, 
permission_denied_putfile);
+  not_found_putfile->setAutoTerminatedRelationships({{"success", "d"}, 
{"permission.denied", "d"}});
+  plan_->setProperty(permission_denied_putfile, 
org::apache::nifi::minifi::processors::PutFile::Directory.getName(), 
permission_denied_output_dir_);

Review comment:
       You could simplify this using `SingleInputTestController`. You can 
specify one input flow file, trigger the processor once, and observe each 
output relationship directly for flow files, eliminating the need for 
`GenerateFlowFile`, `UpdateAttribute` and each `PutFile`. Its signature looks 
like this:
   ```
     std::unordered_map<core::Relationship, 
std::vector<std::shared_ptr<core::FlowFile>>>
     trigger(const std::string_view input_flow_file_content, 
std::unordered_map<std::string, std::string> input_flow_file_attributes = {}) {
   ```

##########
File path: extensions/standard-processors/processors/FetchFile.cpp
##########
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "FetchFile.h"
+
+#include <errno.h>
+#include <filesystem>
+#include <utility>
+
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/FileReaderCallback.h"
+#include "utils/file/FileUtils.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property FetchFile::FileToFetch(
+    core::PropertyBuilder::createProperty("File to Fetch")
+      ->withDescription("The fully-qualified filename of the file to fetch 
from the file system. If not defined the default ${absolute.path}/${filename} 
path is used.")
+      ->supportsExpressionLanguage(true)
+      ->build());
+
+const core::Property FetchFile::CompletionStrategy(
+    core::PropertyBuilder::createProperty("Completion Strategy")
+      ->withDescription("Specifies what to do with the original file on the 
file system once it has been pulled into MiNiFi")
+      ->withDefaultValue<std::string>(toString(CompletionStrategyOption::NONE))
+      ->withAllowableValues<std::string>(CompletionStrategyOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::MoveDestinationDirectory(
+    core::PropertyBuilder::createProperty("Move Destination Directory")
+      ->withDescription("The directory to move the original file to once it 
has been fetched from the file system. "
+                        "This property is ignored unless the Completion 
Strategy is set to \"Move File\". If the directory does not exist, it will be 
created.")
+      ->supportsExpressionLanguage(true)
+      ->build());
+
+const core::Property FetchFile::MoveConflictStrategy(
+    core::PropertyBuilder::createProperty("Move Conflict Strategy")
+      ->withDescription("If Completion Strategy is set to Move File and a file 
already exists in the destination directory with the same name, "
+                        "this property specifies how that naming conflict 
should be resolved")
+      
->withDefaultValue<std::string>(toString(MoveConflictStrategyOption::RENAME))
+      ->withAllowableValues<std::string>(MoveConflictStrategyOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::LogLevelWhenFileNotFound(
+    core::PropertyBuilder::createProperty("Log level when file not found")
+      ->withDescription("Log level to use in case the file does not exist when 
the processor is triggered")
+      ->withDefaultValue<std::string>(toString(LogLevelOption::LOGGING_ERROR))
+      ->withAllowableValues<std::string>(LogLevelOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::LogLevelWhenPermissionDenied(
+    core::PropertyBuilder::createProperty("Log level when permission denied")
+      ->withDescription("Log level to use in case agent does not have 
sufficient permissions to read the file")
+      ->withDefaultValue<std::string>(toString(LogLevelOption::LOGGING_ERROR))
+      ->withAllowableValues<std::string>(LogLevelOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Relationship FetchFile::Success("success", "Any FlowFile that is 
successfully fetched from the file system will be transferred to this 
Relationship.");
+const core::Relationship FetchFile::NotFound(
+  "not.found",
+  "Any FlowFile that could not be fetched from the file system because the 
file could not be found will be transferred to this Relationship.");
+const core::Relationship FetchFile::PermissionDenied(
+  "permission.denied",
+  "Any FlowFile that could not be fetched from the file system due to the user 
running MiNiFi not having sufficient permissions will be transferred to this 
Relationship.");
+const core::Relationship FetchFile::Failure(
+  "failure",
+  "Any FlowFile that could not be fetched from the file system for any reason 
other than insufficient permissions or the file not existing will be 
transferred to this Relationship.");
+
+void FetchFile::initialize() {
+  setSupportedProperties({
+    FileToFetch,
+    CompletionStrategy,
+    MoveDestinationDirectory,
+    MoveConflictStrategy,
+    LogLevelWhenFileNotFound,
+    LogLevelWhenPermissionDenied
+  });
+
+  setSupportedRelationships({
+    Success,
+    NotFound,
+    PermissionDenied,
+    Failure
+  });
+}
+
+void FetchFile::onSchedule(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSessionFactory> 
&/*sessionFactory*/) {
+  gsl_Expects(context);
+  completion_strategy_ = 
utils::parseEnumProperty<CompletionStrategyOption>(*context, 
CompletionStrategy);
+  std::string move_destination_dir;
+  context->getProperty(MoveDestinationDirectory.getName(), 
move_destination_dir);
+  if (completion_strategy_ == CompletionStrategyOption::MOVE_FILE && 
move_destination_dir.empty()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Move Destination Directory is 
required when Completion Strategy is set to Move File");
+  }
+  move_confict_strategy_ = 
utils::parseEnumProperty<MoveConflictStrategyOption>(*context, 
MoveConflictStrategy);
+  log_level_when_file_not_found_ = 
utils::parseEnumProperty<LogLevelOption>(*context, LogLevelWhenFileNotFound);
+  log_level_when_permission_denied_ = 
utils::parseEnumProperty<LogLevelOption>(*context, 
LogLevelWhenPermissionDenied);
+}
+
+std::string FetchFile::getFileToFetch(core::ProcessContext& context, const 
std::shared_ptr<core::FlowFile>& flow_file) const {
+  std::string file_to_fetch_path;
+  context.getProperty(FileToFetch, file_to_fetch_path, flow_file);
+  if (!file_to_fetch_path.empty()) {
+    return file_to_fetch_path;
+  }
+
+  flow_file->getAttribute("absolute.path", file_to_fetch_path);
+  std::string filename;
+  flow_file->getAttribute("filename", filename);
+  file_to_fetch_path += utils::file::FileUtils::get_separator() + filename;
+  return file_to_fetch_path;
+}
+
+void FetchFile::logWithLevel(LogLevelOption log_level, const std::string& 
message) const {
+  switch (log_level.value()) {
+    case LogLevelOption::LOGGING_TRACE:
+      logger_->log_trace(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_DEBUG:
+      logger_->log_debug(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_INFO:
+      logger_->log_info(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_WARN:
+      logger_->log_warn(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_ERROR:
+      logger_->log_error(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_OFF:
+    default:
+      break;
+  }
+}
+
+std::string FetchFile::getMoveAbsolutePath(const std::string& file_name) const 
{
+  return move_destination_directory_ + utils::file::FileUtils::get_separator() 
+ file_name;
+}
+
+bool FetchFile::moveDestinationConflicts(const std::string& file_name) const {
+  return utils::file::FileUtils::exists(getMoveAbsolutePath(file_name));
+}
+
+bool FetchFile::moveWouldFailWithDestinationconflict(const std::string& 
file_name) const {

Review comment:
       typo: 'c' of conflict should be capital

##########
File path: extensions/standard-processors/processors/FetchFile.cpp
##########
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "FetchFile.h"
+
+#include <errno.h>
+#include <filesystem>
+#include <utility>
+
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/FileReaderCallback.h"
+#include "utils/file/FileUtils.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property FetchFile::FileToFetch(
+    core::PropertyBuilder::createProperty("File to Fetch")
+      ->withDescription("The fully-qualified filename of the file to fetch 
from the file system. If not defined the default ${absolute.path}/${filename} 
path is used.")
+      ->supportsExpressionLanguage(true)
+      ->build());
+
+const core::Property FetchFile::CompletionStrategy(
+    core::PropertyBuilder::createProperty("Completion Strategy")
+      ->withDescription("Specifies what to do with the original file on the 
file system once it has been pulled into MiNiFi")
+      ->withDefaultValue<std::string>(toString(CompletionStrategyOption::NONE))
+      ->withAllowableValues<std::string>(CompletionStrategyOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::MoveDestinationDirectory(
+    core::PropertyBuilder::createProperty("Move Destination Directory")
+      ->withDescription("The directory to move the original file to once it 
has been fetched from the file system. "
+                        "This property is ignored unless the Completion 
Strategy is set to \"Move File\". If the directory does not exist, it will be 
created.")
+      ->supportsExpressionLanguage(true)
+      ->build());
+
+const core::Property FetchFile::MoveConflictStrategy(
+    core::PropertyBuilder::createProperty("Move Conflict Strategy")
+      ->withDescription("If Completion Strategy is set to Move File and a file 
already exists in the destination directory with the same name, "
+                        "this property specifies how that naming conflict 
should be resolved")
+      
->withDefaultValue<std::string>(toString(MoveConflictStrategyOption::RENAME))
+      ->withAllowableValues<std::string>(MoveConflictStrategyOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::LogLevelWhenFileNotFound(
+    core::PropertyBuilder::createProperty("Log level when file not found")
+      ->withDescription("Log level to use in case the file does not exist when 
the processor is triggered")
+      ->withDefaultValue<std::string>(toString(LogLevelOption::LOGGING_ERROR))
+      ->withAllowableValues<std::string>(LogLevelOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::LogLevelWhenPermissionDenied(
+    core::PropertyBuilder::createProperty("Log level when permission denied")
+      ->withDescription("Log level to use in case agent does not have 
sufficient permissions to read the file")
+      ->withDefaultValue<std::string>(toString(LogLevelOption::LOGGING_ERROR))
+      ->withAllowableValues<std::string>(LogLevelOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Relationship FetchFile::Success("success", "Any FlowFile that is 
successfully fetched from the file system will be transferred to this 
Relationship.");
+const core::Relationship FetchFile::NotFound(
+  "not.found",
+  "Any FlowFile that could not be fetched from the file system because the 
file could not be found will be transferred to this Relationship.");
+const core::Relationship FetchFile::PermissionDenied(
+  "permission.denied",
+  "Any FlowFile that could not be fetched from the file system due to the user 
running MiNiFi not having sufficient permissions will be transferred to this 
Relationship.");
+const core::Relationship FetchFile::Failure(
+  "failure",
+  "Any FlowFile that could not be fetched from the file system for any reason 
other than insufficient permissions or the file not existing will be 
transferred to this Relationship.");
+
+void FetchFile::initialize() {
+  setSupportedProperties({
+    FileToFetch,
+    CompletionStrategy,
+    MoveDestinationDirectory,
+    MoveConflictStrategy,
+    LogLevelWhenFileNotFound,
+    LogLevelWhenPermissionDenied
+  });
+
+  setSupportedRelationships({
+    Success,
+    NotFound,
+    PermissionDenied,
+    Failure
+  });
+}
+
+void FetchFile::onSchedule(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSessionFactory> 
&/*sessionFactory*/) {
+  gsl_Expects(context);
+  completion_strategy_ = 
utils::parseEnumProperty<CompletionStrategyOption>(*context, 
CompletionStrategy);
+  std::string move_destination_dir;
+  context->getProperty(MoveDestinationDirectory.getName(), 
move_destination_dir);
+  if (completion_strategy_ == CompletionStrategyOption::MOVE_FILE && 
move_destination_dir.empty()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Move Destination Directory is 
required when Completion Strategy is set to Move File");
+  }
+  move_confict_strategy_ = 
utils::parseEnumProperty<MoveConflictStrategyOption>(*context, 
MoveConflictStrategy);
+  log_level_when_file_not_found_ = 
utils::parseEnumProperty<LogLevelOption>(*context, LogLevelWhenFileNotFound);
+  log_level_when_permission_denied_ = 
utils::parseEnumProperty<LogLevelOption>(*context, 
LogLevelWhenPermissionDenied);
+}
+
+std::string FetchFile::getFileToFetch(core::ProcessContext& context, const 
std::shared_ptr<core::FlowFile>& flow_file) const {
+  std::string file_to_fetch_path;
+  context.getProperty(FileToFetch, file_to_fetch_path, flow_file);
+  if (!file_to_fetch_path.empty()) {
+    return file_to_fetch_path;
+  }
+
+  flow_file->getAttribute("absolute.path", file_to_fetch_path);
+  std::string filename;
+  flow_file->getAttribute("filename", filename);
+  file_to_fetch_path += utils::file::FileUtils::get_separator() + filename;
+  return file_to_fetch_path;
+}
+
+void FetchFile::logWithLevel(LogLevelOption log_level, const std::string& 
message) const {
+  switch (log_level.value()) {
+    case LogLevelOption::LOGGING_TRACE:
+      logger_->log_trace(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_DEBUG:
+      logger_->log_debug(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_INFO:
+      logger_->log_info(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_WARN:
+      logger_->log_warn(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_ERROR:
+      logger_->log_error(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_OFF:
+    default:
+      break;
+  }
+}
+
+std::string FetchFile::getMoveAbsolutePath(const std::string& file_name) const 
{
+  return move_destination_directory_ + utils::file::FileUtils::get_separator() 
+ file_name;
+}
+
+bool FetchFile::moveDestinationConflicts(const std::string& file_name) const {
+  return utils::file::FileUtils::exists(getMoveAbsolutePath(file_name));
+}
+
+bool FetchFile::moveWouldFailWithDestinationconflict(const std::string& 
file_name) const {
+  if (completion_strategy_ != CompletionStrategyOption::MOVE_FILE || 
move_confict_strategy_ != MoveConflictStrategyOption::FAIL) {
+    return false;
+  }
+
+  return moveDestinationConflicts(file_name);
+}
+
+void FetchFile::executeMoveConflictStrategy(const std::string& 
file_to_fetch_path, const std::string& file_name) {
+  if (move_confict_strategy_ == MoveConflictStrategyOption::REPLACE_FILE) {
+    auto moved_path = getMoveAbsolutePath(file_name);
+    logger_->log_info("Due to conflict replacing file '%s' by the Move 
Completion Strategy", moved_path);
+    std::filesystem::rename(file_to_fetch_path, moved_path);
+  } else if (move_confict_strategy_ == MoveConflictStrategyOption::RENAME) {
+    auto generated_filename = 
utils::IdGenerator::getIdGenerator()->generate().to_string();
+    logger_->log_info("Due to conflict file '%s' is moved with generated name 
'%s' by the Move Completion Strategy", file_to_fetch_path, generated_filename);
+    std::filesystem::rename(file_to_fetch_path, 
getMoveAbsolutePath(generated_filename));
+  } else if (move_confict_strategy_ == 
MoveConflictStrategyOption::KEEP_EXISTING) {
+    logger_->log_info("Due to conflict file '%s' is deleted by the Move 
Completion Strategy", file_to_fetch_path);

Review comment:
       I would reduce these log levels (debug?) to avoid spamming the logs on 
high traffic deployments when everything is working normally.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to