This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 866c3f3 MINIFICPP-1740 Add FetchFile processor
866c3f3 is described below
commit 866c3f36fa19fe6ebc7f9783f8e16ce59de8f958
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Mon Mar 28 11:10:34 2022 +0200
MINIFICPP-1740 Add FetchFile processor
FetchFile added mostly according to NiFi's implementation. A difference
between this and NiFi's implementation is that the flow is only
transferred to failure due to completion strategy failure if it is
explicitly set to fail (in case of move strategy conflict). In NiFi's
case it's a bit inconsistent as it could also fail if the move target
directory could not be created or it has permission problems, but the
flow is transferred to success in any other completion strategy failure.
Closes #1262
Signed-off-by: Marton Szasz <[email protected]>
---
PROCESSORS.md | 29 ++
README.md | 2 +-
.../standard-processors/processors/FetchFile.cpp | 262 ++++++++++++++++
.../standard-processors/processors/FetchFile.h | 96 ++++++
.../tests/unit/FetchFileTests.cpp | 348 +++++++++++++++++++++
libminifi/include/utils/FileReaderCallback.h | 3 +-
libminifi/src/utils/FileReaderCallback.cpp | 4 +-
7 files changed, 740 insertions(+), 4 deletions(-)
diff --git a/PROCESSORS.md b/PROCESSORS.md
index 0a3adc1..54caa87 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -24,6 +24,7 @@
- [ExtractText](#extracttext)
- [FetchAzureBlobStorage](#fetchazureblobstorage)
- [FetchAzureDataLakeStorage](#fetchazuredatalakestorage)
+- [FetchFile](#fetchfile)
- [FetchOPCProcessor](#fetchopcprocessor)
- [FetchS3Object](#fetchs3object)
- [FetchSFTP](#fetchsftp)
@@ -598,6 +599,34 @@ In the list below, the names of required properties appear
in bold. Any other pr
|success|Files that have been successfully fetched from Azure storage are
transferred to this relationship|
+## FetchFile
+
+### Description
+
+Reads the contents of a file from disk and streams it into the contents of an
incoming FlowFile. Once this is done, the file is optionally moved elsewhere or
deleted to help keep the file system organized.
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other
properties (not in bold) are considered optional. The table also indicates any
default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
+|File to Fetch|||The fully-qualified filename of the file to fetch from the
file system. If not defined the default ${absolute.path}/${filename} path is
used.<br/>**Supports Expression Language: true**|
+|**Completion Strategy**|None|None<br/>Move File<br/>Delete File|Specifies
what to do with the original file on the file system once it has been pulled
into MiNiFi|
+|Move Destination Directory|||The directory to move the original file to once
it has been fetched from the file system. This property is ignored unless the
Completion Strategy is set to "Move File". If the directory does not exist, it
will be created.<br/>**Supports Expression Language: true**|
+|**Move Conflict Strategy**|Rename|Rename<br/>Replace File<br/>Keep
Existing<br/>Fail|If Completion Strategy is set to Move File and a file already
exists in the destination directory with the same name, this property specifies
how that naming conflict should be resolved|
+|**Log level when file not
found**|ERROR|TRACE<br/>DEBUG<br/>INFO<br/>WARN<br/>ERROR<br/>OFF|Log level to
use in case the file does not exist when the processor is triggered|
+|**Log level when permission
denied**|ERROR|TRACE<br/>DEBUG<br/>INFO<br/>WARN<br/>ERROR<br/>OFF|Log level to
use in case agent does not have sufficient permissions to read the file|
+
+### Relationships
+
+| Name | Description |
+| - | - |
+|success|Any FlowFile that is successfully fetched from the file system will
be transferred to this Relationship.|
+|not.found|Any FlowFile that could not be fetched from the file system because
the file could not be found will be transferred to this Relationship.|
+|permission.denied|Any FlowFile that could not be fetched from the file system
due to the user running MiNiFi not having sufficient permissions will be
transferred to this Relationship.|
+|failure|Any FlowFile that could not be fetched from the file system for any
reason other than insufficient permissions or the file not existing will be
transferred to this Relationship.|
+
+
## FetchOPCProcessor
### Description
diff --git a/README.md b/README.md
index 242c64c..74de4a0 100644
--- a/README.md
+++ b/README.md
@@ -65,7 +65,7 @@ The following table lists the base set of processors.
| Extension Set | Processors |
| ------------- |:-------------|
-| **Base** |
[AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/>
[GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[ListenSyslog](PROCESSORS.md#listensyslog)<br/>[LogAttribute](PROCESSORS.md#logattribute)<br/>[PutFile](P
[...]
+| **Base** |
[AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/>[FetchFile](PROCESSORS.md#fetchfile)<br/>[GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[ListenSyslog](PROCESSORS.md#listensyslog)<br/>[LogAttribute](PRO
[...]
The next table outlines CMAKE flags that correspond with MiNiFi extensions.
Extensions that are enabled by default ( such as CURL ), can be disabled with
the respective CMAKE flag on the command line.
diff --git a/extensions/standard-processors/processors/FetchFile.cpp
b/extensions/standard-processors/processors/FetchFile.cpp
new file mode 100644
index 0000000..924b540
--- /dev/null
+++ b/extensions/standard-processors/processors/FetchFile.cpp
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "FetchFile.h"
+
+#include <errno.h>
+#include <filesystem>
+#include <utility>
+
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/FileReaderCallback.h"
+#include "utils/file/FileUtils.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property FetchFile::FileToFetch(
+ core::PropertyBuilder::createProperty("File to Fetch")
+ ->withDescription("The fully-qualified filename of the file to fetch
from the file system. If not defined the default ${absolute.path}/${filename}
path is used.")
+ ->supportsExpressionLanguage(true)
+ ->build());
+
+const core::Property FetchFile::CompletionStrategy(
+ core::PropertyBuilder::createProperty("Completion Strategy")
+ ->withDescription("Specifies what to do with the original file on the
file system once it has been pulled into MiNiFi")
+ ->withDefaultValue<std::string>(toString(CompletionStrategyOption::NONE))
+ ->withAllowableValues<std::string>(CompletionStrategyOption::values())
+ ->isRequired(true)
+ ->build());
+
+const core::Property FetchFile::MoveDestinationDirectory(
+ core::PropertyBuilder::createProperty("Move Destination Directory")
+ ->withDescription("The directory to move the original file to once it
has been fetched from the file system. "
+ "This property is ignored unless the Completion
Strategy is set to \"Move File\". If the directory does not exist, it will be
created.")
+ ->supportsExpressionLanguage(true)
+ ->build());
+
+const core::Property FetchFile::MoveConflictStrategy(
+ core::PropertyBuilder::createProperty("Move Conflict Strategy")
+ ->withDescription("If Completion Strategy is set to Move File and a file
already exists in the destination directory with the same name, "
+ "this property specifies how that naming conflict
should be resolved")
+
->withDefaultValue<std::string>(toString(MoveConflictStrategyOption::RENAME))
+ ->withAllowableValues<std::string>(MoveConflictStrategyOption::values())
+ ->isRequired(true)
+ ->build());
+
+const core::Property FetchFile::LogLevelWhenFileNotFound(
+ core::PropertyBuilder::createProperty("Log level when file not found")
+ ->withDescription("Log level to use in case the file does not exist when
the processor is triggered")
+ ->withDefaultValue<std::string>(toString(LogLevelOption::LOGGING_ERROR))
+ ->withAllowableValues<std::string>(LogLevelOption::values())
+ ->isRequired(true)
+ ->build());
+
+const core::Property FetchFile::LogLevelWhenPermissionDenied(
+ core::PropertyBuilder::createProperty("Log level when permission denied")
+ ->withDescription("Log level to use in case agent does not have
sufficient permissions to read the file")
+ ->withDefaultValue<std::string>(toString(LogLevelOption::LOGGING_ERROR))
+ ->withAllowableValues<std::string>(LogLevelOption::values())
+ ->isRequired(true)
+ ->build());
+
+const core::Relationship FetchFile::Success("success", "Any FlowFile that is
successfully fetched from the file system will be transferred to this
Relationship.");
+const core::Relationship FetchFile::NotFound(
+ "not.found",
+ "Any FlowFile that could not be fetched from the file system because the
file could not be found will be transferred to this Relationship.");
+const core::Relationship FetchFile::PermissionDenied(
+ "permission.denied",
+ "Any FlowFile that could not be fetched from the file system due to the user
running MiNiFi not having sufficient permissions will be transferred to this
Relationship.");
+const core::Relationship FetchFile::Failure(
+ "failure",
+ "Any FlowFile that could not be fetched from the file system for any reason
other than insufficient permissions or the file not existing will be
transferred to this Relationship.");
+
+void FetchFile::initialize() {
+ setSupportedProperties({
+ FileToFetch,
+ CompletionStrategy,
+ MoveDestinationDirectory,
+ MoveConflictStrategy,
+ LogLevelWhenFileNotFound,
+ LogLevelWhenPermissionDenied
+ });
+
+ setSupportedRelationships({
+ Success,
+ NotFound,
+ PermissionDenied,
+ Failure
+ });
+}
+
+void FetchFile::onSchedule(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::ProcessSessionFactory>
&/*sessionFactory*/) {
+ gsl_Expects(context);
+ completion_strategy_ =
utils::parseEnumProperty<CompletionStrategyOption>(*context,
CompletionStrategy);
+ std::string move_destination_dir;
+ context->getProperty(MoveDestinationDirectory.getName(),
move_destination_dir);
+ if (completion_strategy_ == CompletionStrategyOption::MOVE_FILE &&
move_destination_dir.empty()) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Move Destination Directory is
required when Completion Strategy is set to Move File");
+ }
+ move_confict_strategy_ =
utils::parseEnumProperty<MoveConflictStrategyOption>(*context,
MoveConflictStrategy);
+ log_level_when_file_not_found_ =
utils::parseEnumProperty<LogLevelOption>(*context, LogLevelWhenFileNotFound);
+ log_level_when_permission_denied_ =
utils::parseEnumProperty<LogLevelOption>(*context,
LogLevelWhenPermissionDenied);
+}
+
+std::string FetchFile::getFileToFetch(core::ProcessContext& context, const
std::shared_ptr<core::FlowFile>& flow_file) const {
+ std::string file_to_fetch_path;
+ context.getProperty(FileToFetch, file_to_fetch_path, flow_file);
+ if (!file_to_fetch_path.empty()) {
+ return file_to_fetch_path;
+ }
+
+ flow_file->getAttribute("absolute.path", file_to_fetch_path);
+ std::string filename;
+ flow_file->getAttribute("filename", filename);
+ file_to_fetch_path += utils::file::FileUtils::get_separator() + filename;
+ return file_to_fetch_path;
+}
+
+void FetchFile::logWithLevel(LogLevelOption log_level, const std::string&
message) const {
+ switch (log_level.value()) {
+ case LogLevelOption::LOGGING_TRACE:
+ logger_->log_trace(message.c_str());
+ break;
+ case LogLevelOption::LOGGING_DEBUG:
+ logger_->log_debug(message.c_str());
+ break;
+ case LogLevelOption::LOGGING_INFO:
+ logger_->log_info(message.c_str());
+ break;
+ case LogLevelOption::LOGGING_WARN:
+ logger_->log_warn(message.c_str());
+ break;
+ case LogLevelOption::LOGGING_ERROR:
+ logger_->log_error(message.c_str());
+ break;
+ case LogLevelOption::LOGGING_OFF:
+ default:
+ break;
+ }
+}
+
+std::string FetchFile::getMoveAbsolutePath(const std::string& file_name) const
{
+ return move_destination_directory_ + utils::file::FileUtils::get_separator()
+ file_name;
+}
+
+bool FetchFile::moveDestinationConflicts(const std::string& file_name) const {
+ return utils::file::FileUtils::exists(getMoveAbsolutePath(file_name));
+}
+
+bool FetchFile::moveWouldFailWithDestinationconflict(const std::string&
file_name) const {
+ if (completion_strategy_ != CompletionStrategyOption::MOVE_FILE ||
move_confict_strategy_ != MoveConflictStrategyOption::FAIL) {
+ return false;
+ }
+
+ return moveDestinationConflicts(file_name);
+}
+
+void FetchFile::executeMoveConflictStrategy(const std::string&
file_to_fetch_path, const std::string& file_name) {
+ if (move_confict_strategy_ == MoveConflictStrategyOption::REPLACE_FILE) {
+ auto moved_path = getMoveAbsolutePath(file_name);
+ logger_->log_info("Due to conflict replacing file '%s' by the Move
Completion Strategy", moved_path);
+ std::filesystem::rename(file_to_fetch_path, moved_path);
+ } else if (move_confict_strategy_ == MoveConflictStrategyOption::RENAME) {
+ auto generated_filename =
utils::IdGenerator::getIdGenerator()->generate().to_string();
+ logger_->log_info("Due to conflict file '%s' is moved with generated name
'%s' by the Move Completion Strategy", file_to_fetch_path, generated_filename);
+ std::filesystem::rename(file_to_fetch_path,
getMoveAbsolutePath(generated_filename));
+ } else if (move_confict_strategy_ ==
MoveConflictStrategyOption::KEEP_EXISTING) {
+ logger_->log_info("Due to conflict file '%s' is deleted by the Move
Completion Strategy", file_to_fetch_path);
+ std::filesystem::remove(file_to_fetch_path);
+ }
+}
+
+void FetchFile::processMoveCompletion(const std::string& file_to_fetch_path,
const std::string& file_name) {
+ if (!moveDestinationConflicts(file_name)) {
+ if (!utils::file::FileUtils::exists(move_destination_directory_)) {
+ std::filesystem::create_directories(move_destination_directory_);
+ }
+ auto moved_path = getMoveAbsolutePath(file_name);
+ logger_->log_info("'%s' is moved to '%s' by the Move Completion Strategy",
file_to_fetch_path, moved_path);
+ std::filesystem::rename(file_to_fetch_path, moved_path);
+ return;
+ }
+
+ executeMoveConflictStrategy(file_to_fetch_path, file_name);
+}
+
+void FetchFile::executeCompletionStrategy(const std::string&
file_to_fetch_path, const std::string& file_name) {
+ try {
+ if (completion_strategy_ == CompletionStrategyOption::MOVE_FILE) {
+ processMoveCompletion(file_to_fetch_path, file_name);
+ } else if (completion_strategy_ == CompletionStrategyOption::DELETE_FILE) {
+ logger_->log_info("File '%s' is deleted by the Delete Completion
Strategy", file_to_fetch_path);
+ std::filesystem::remove(file_to_fetch_path);
+ }
+ } catch(const std::filesystem::filesystem_error& ex) {
+ logger_->log_warn("Executing completion strategy failed due to filesystem
error: %s", ex.what());
+ }
+}
+
+void FetchFile::onTrigger(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::ProcessSession> &session) {
+ gsl_Expects(context && session);
+ logger_->log_trace("FetchFile onTrigger");
+ auto flow_file = session->get();
+ if (!flow_file) {
+ context->yield();
+ return;
+ }
+
+ const auto file_to_fetch_path = getFileToFetch(*context, flow_file);
+ if (!std::filesystem::is_regular_file(file_to_fetch_path)) {
+ logWithLevel(log_level_when_file_not_found_, "File to fetch was not found:
'" + file_to_fetch_path + "'!");
+ session->transfer(flow_file, NotFound);
+ return;
+ }
+
+ std::string path;
+ std::string file_name;
+ utils::file::getFileNameAndPath(file_to_fetch_path, path, file_name);
+
+ context->getProperty(MoveDestinationDirectory, move_destination_directory_,
flow_file);
+ if (moveWouldFailWithDestinationconflict(file_name)) {
+ logger_->log_error("Move destination (%s) conflicts with an already
existing file!", move_destination_directory_);
+ session->transfer(flow_file, Failure);
+ return;
+ }
+
+ try {
+ utils::FileReaderCallback callback(file_to_fetch_path);
+ session->write(flow_file, std::move(callback));
+ logger_->log_debug("Fetching file '%s' successful!", file_to_fetch_path);
+ session->transfer(flow_file, Success);
+ } catch (const utils::FileReaderCallbackIOError& io_error) {
+ if (io_error.error_code == EACCES) {
+ logWithLevel(log_level_when_permission_denied_, "Read permission denied
for file '" + file_to_fetch_path + "' to be fetched!");
+ session->transfer(flow_file, PermissionDenied);
+ } else {
+ logger_->log_error("Fetching file '%s' failed! %s", file_to_fetch_path,
io_error.what());
+ session->transfer(flow_file, Failure);
+ }
+ return;
+ }
+
+ executeCompletionStrategy(file_to_fetch_path, file_name);
+}
+
+REGISTER_RESOURCE(FetchFile, "Reads the contents of a file from disk and
streams it into the contents of an incoming FlowFile. "
+ "Once this is done, the file is optionally moved elsewhere or deleted to
help keep the file system organized.");
+
+} // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/FetchFile.h
b/extensions/standard-processors/processors/FetchFile.h
new file mode 100644
index 0000000..3b52805
--- /dev/null
+++ b/extensions/standard-processors/processors/FetchFile.h
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include <memory>
+
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Property.h"
+#include "utils/Enum.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+class FetchFile : public core::Processor {
+ public:
+ SMART_ENUM(CompletionStrategyOption,
+ (NONE, "None"),
+ (MOVE_FILE, "Move File"),
+ (DELETE_FILE, "Delete File")
+ )
+
+ SMART_ENUM(MoveConflictStrategyOption,
+ (RENAME, "Rename"),
+ (REPLACE_FILE, "Replace File"),
+ (KEEP_EXISTING, "Keep Existing"),
+ (FAIL, "Fail")
+ )
+
+ SMART_ENUM(LogLevelOption,
+ (LOGGING_TRACE, "TRACE"),
+ (LOGGING_DEBUG, "DEBUG"),
+ (LOGGING_INFO, "INFO"),
+ (LOGGING_WARN, "WARN"),
+ (LOGGING_ERROR, "ERROR"),
+ (LOGGING_OFF, "OFF")
+ )
+
+ explicit FetchFile(const std::string& name, const utils::Identifier& uuid =
{})
+ : core::Processor(name, uuid) {
+ }
+
+ EXTENSIONAPI static const core::Property FileToFetch;
+ EXTENSIONAPI static const core::Property CompletionStrategy;
+ EXTENSIONAPI static const core::Property MoveDestinationDirectory;
+ EXTENSIONAPI static const core::Property MoveConflictStrategy;
+ EXTENSIONAPI static const core::Property LogLevelWhenFileNotFound;
+ EXTENSIONAPI static const core::Property LogLevelWhenPermissionDenied;
+
+ EXTENSIONAPI static const core::Relationship Success;
+ EXTENSIONAPI static const core::Relationship NotFound;
+ EXTENSIONAPI static const core::Relationship PermissionDenied;
+ EXTENSIONAPI static const core::Relationship Failure;
+
+ void initialize() override;
+ void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::ProcessSessionFactory> &session_factory) override;
+ void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::ProcessSession> &session) override;
+
+ core::annotation::Input getInputRequirement() const override {
+ return core::annotation::Input::INPUT_REQUIRED;
+ }
+
+ private:
+ std::string getFileToFetch(core::ProcessContext& context, const
std::shared_ptr<core::FlowFile>& flow_file) const;
+ void logWithLevel(LogLevelOption log_level, const std::string& message)
const;
+ std::string getMoveAbsolutePath(const std::string& file_name) const;
+ bool moveDestinationConflicts(const std::string& file_name) const;
+ bool moveWouldFailWithDestinationconflict(const std::string& file_name)
const;
+ void executeMoveConflictStrategy(const std::string& file_to_fetch_path,
const std::string& file_name);
+ void processMoveCompletion(const std::string& file_to_fetch_path, const
std::string& file_name);
+ void executeCompletionStrategy(const std::string& file_to_fetch_path, const
std::string& file_name);
+
+ std::string move_destination_directory_;
+ CompletionStrategyOption completion_strategy_;
+ MoveConflictStrategyOption move_confict_strategy_;
+ LogLevelOption log_level_when_file_not_found_;
+ LogLevelOption log_level_when_permission_denied_;
+ std::shared_ptr<core::logging::Logger> logger_ =
core::logging::LoggerFactory<FetchFile>::getLogger();
+};
+
+} // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/tests/unit/FetchFileTests.cpp
b/extensions/standard-processors/tests/unit/FetchFileTests.cpp
new file mode 100644
index 0000000..dbc1762
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/FetchFileTests.cpp
@@ -0,0 +1,348 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <memory>
+#include <string>
+#include <unordered_set>
+
+#include "TestBase.h"
+#include "Catch.h"
+#include "core/Property.h"
+#include "core/Processor.h"
+#include "processors/GenerateFlowFile.h"
+#include "processors/FetchFile.h"
+#include "processors/PutFile.h"
+#include "utils/TestUtils.h"
+#include "utils/IntegrationTestUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace {
+
+class FetchFileTestFixture {
+ public:
+ FetchFileTestFixture();
+ ~FetchFileTestFixture();
+ std::unordered_multiset<std::string> getSuccessfulFlowFileContents() const;
+ std::unordered_multiset<std::string> getFailedFlowFileContents() const;
+ std::unordered_multiset<std::string> getNotFoundFlowFileContents() const;
+#ifndef WIN32
+ std::unordered_multiset<std::string> getPermissionDeniedFlowFileContents()
const;
+#endif
+
+ protected:
+ std::unordered_multiset<std::string> getDirContents(const std::string&
dir_path) const;
+
+ TestController test_controller_;
+ std::shared_ptr<TestPlan> plan_;
+ const std::string input_dir_;
+ const std::string success_output_dir_;
+ const std::string failure_output_dir_;
+ const std::string not_found_output_dir_;
+ const std::string permission_denied_output_dir_;
+ const std::string permission_denied_file_name_;
+ const std::string input_file_name_;
+ const std::string file_content_;
+ std::shared_ptr<core::Processor> fetch_file_processor_;
+ std::shared_ptr<core::Processor> update_attribute_processor_;
+};
+
+FetchFileTestFixture::FetchFileTestFixture()
+ : plan_(test_controller_.createPlan()),
+ input_dir_(test_controller_.createTempDirectory()),
+ success_output_dir_(test_controller_.createTempDirectory()),
+ failure_output_dir_(test_controller_.createTempDirectory()),
+ not_found_output_dir_(test_controller_.createTempDirectory()),
+ permission_denied_output_dir_(test_controller_.createTempDirectory()),
+ permission_denied_file_name_("permission_denied.txt"),
+ input_file_name_("test.txt"),
+ file_content_("The quick brown fox jumps over the lazy dog\n") {
+ LogTestController::getInstance().setTrace<TestPlan>();
+ LogTestController::getInstance().setTrace<minifi::processors::FetchFile>();
+
LogTestController::getInstance().setTrace<minifi::processors::GenerateFlowFile>();
+
+ REQUIRE(!input_dir_.empty());
+ REQUIRE(!success_output_dir_.empty());
+ REQUIRE(!failure_output_dir_.empty());
+ REQUIRE(!not_found_output_dir_.empty());
+ REQUIRE(!permission_denied_output_dir_.empty());
+
+ auto generate_flow_file_processor = plan_->addProcessor("GenerateFlowFile",
"GenerateFlowFile");
+ plan_->setProperty(generate_flow_file_processor,
org::apache::nifi::minifi::processors::GenerateFlowFile::FileSize.getName(),
"0B");
+ update_attribute_processor_ = plan_->addProcessor("UpdateAttribute",
"UpdateAttribute", core::Relationship("success", "description"), true);
+ plan_->setProperty(update_attribute_processor_, "absolute.path", input_dir_,
true);
+ plan_->setProperty(update_attribute_processor_, "filename",
input_file_name_, true);
+
+ fetch_file_processor_ = plan_->addProcessor("FetchFile", "FetchFile",
core::Relationship("success", "description"), true);
+
+ auto success_putfile = plan_->addProcessor("PutFile", "SuccessPutFile", {
{"success", "d"} }, false);
+ plan_->addConnection(fetch_file_processor_, {"success", "d"},
success_putfile);
+ success_putfile->setAutoTerminatedRelationships({{"success", "d"},
{"failure", "d"}});
+ plan_->setProperty(success_putfile,
org::apache::nifi::minifi::processors::PutFile::Directory.getName(),
success_output_dir_);
+
+ auto failure_putfile = plan_->addProcessor("PutFile", "FailurePutFile", {
{"success", "d"} }, false);
+ plan_->addConnection(fetch_file_processor_, {"failure", "d"},
failure_putfile);
+ failure_putfile->setAutoTerminatedRelationships({{"success", "d"},
{"failure", "d"}});
+ plan_->setProperty(failure_putfile,
org::apache::nifi::minifi::processors::PutFile::Directory.getName(),
failure_output_dir_);
+
+ auto not_found_putfile = plan_->addProcessor("PutFile", "NotFoundPutFile", {
{"success", "d"} }, false);
+ plan_->addConnection(fetch_file_processor_, {"not.found", "d"},
not_found_putfile);
+ not_found_putfile->setAutoTerminatedRelationships({{"success", "d"},
{"not.found", "d"}});
+ plan_->setProperty(not_found_putfile,
org::apache::nifi::minifi::processors::PutFile::Directory.getName(),
not_found_output_dir_);
+
+ auto permission_denied_putfile = plan_->addProcessor("PutFile",
"PermissionDeniedPutFile", { {"success", "d"} }, false);
+ plan_->addConnection(fetch_file_processor_, {"permission.denied", "d"},
permission_denied_putfile);
+ not_found_putfile->setAutoTerminatedRelationships({{"success", "d"},
{"permission.denied", "d"}});
+ plan_->setProperty(permission_denied_putfile,
org::apache::nifi::minifi::processors::PutFile::Directory.getName(),
permission_denied_output_dir_);
+
+ utils::putFileToDir(input_dir_, input_file_name_, file_content_);
+ utils::putFileToDir(input_dir_, permission_denied_file_name_, file_content_);
+#ifndef WIN32
+ utils::file::FileUtils::set_permissions(input_dir_ +
utils::file::FileUtils::get_separator() + permission_denied_file_name_, 0);
+#endif
+}
+
+FetchFileTestFixture::~FetchFileTestFixture() {
+#ifndef WIN32
+ utils::file::FileUtils::set_permissions(input_dir_ +
utils::file::FileUtils::get_separator() + permission_denied_file_name_, 0644);
+#endif
+}
+
+std::unordered_multiset<std::string>
FetchFileTestFixture::getDirContents(const std::string& dir_path) const {
+ std::unordered_multiset<std::string> file_contents;
+
+ auto lambda = [&file_contents](const std::string& path, const std::string&
filename) -> bool {
+ std::ifstream is(path + utils::file::FileUtils::get_separator() +
filename, std::ifstream::binary);
+ file_contents.insert(std::string((std::istreambuf_iterator<char>(is)),
std::istreambuf_iterator<char>()));
+ return true;
+ };
+
+ utils::file::FileUtils::list_dir(dir_path, lambda, plan_->getLogger(),
false);
+ return file_contents;
+}
+
+std::unordered_multiset<std::string>
FetchFileTestFixture::getSuccessfulFlowFileContents() const {
+ return getDirContents(success_output_dir_);
+}
+
+std::unordered_multiset<std::string>
FetchFileTestFixture::getFailedFlowFileContents() const {
+ return getDirContents(failure_output_dir_);
+}
+
+std::unordered_multiset<std::string>
FetchFileTestFixture::getNotFoundFlowFileContents() const {
+ return getDirContents(not_found_output_dir_);
+}
+
+#ifndef WIN32
+std::unordered_multiset<std::string>
FetchFileTestFixture::getPermissionDeniedFlowFileContents() const {
+ return getDirContents(permission_denied_output_dir_);
+}
+#endif
+
+TEST_CASE_METHOD(FetchFileTestFixture, "Test fetching file with default but
non-existent file path", "[testFetchFile]") {
+ plan_->setProperty(update_attribute_processor_, "filename",
"non_existent.file", true);
+ test_controller_.runSession(plan_);
+ auto file_contents = getNotFoundFlowFileContents();
+ std::unordered_multiset<std::string> expected{""};
+ REQUIRE(file_contents == expected);
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ REQUIRE(verifyLogLinePresenceInPollTime(1s, "[error] File to fetch was not
found"));
+}
+
+TEST_CASE_METHOD(FetchFileTestFixture, "FileToFetch property set to a
non-existent file path", "[testFetchFile]") {
+ plan_->setProperty(fetch_file_processor_,
org::apache::nifi::minifi::processors::FetchFile::FileToFetch.getName(),
"/tmp/non_existent.file");
+ plan_->setProperty(fetch_file_processor_,
org::apache::nifi::minifi::processors::FetchFile::LogLevelWhenFileNotFound.getName(),
"INFO");
+ test_controller_.runSession(plan_);
+ auto file_contents = getNotFoundFlowFileContents();
+ std::unordered_multiset<std::string> expected{""};
+ REQUIRE(file_contents == expected);
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ REQUIRE(verifyLogLinePresenceInPollTime(1s, "[info] File to fetch was not
found"));
+}
+
+#ifndef WIN32
+TEST_CASE_METHOD(FetchFileTestFixture, "Permission denied to read file",
"[testFetchFile]") {
+ plan_->setProperty(fetch_file_processor_,
org::apache::nifi::minifi::processors::FetchFile::FileToFetch.getName(),
+ input_dir_ + utils::file::FileUtils::get_separator() +
permission_denied_file_name_);
+ plan_->setProperty(fetch_file_processor_,
org::apache::nifi::minifi::processors::FetchFile::LogLevelWhenPermissionDenied.getName(),
"WARN");
+ test_controller_.runSession(plan_);
+ auto file_contents = getPermissionDeniedFlowFileContents();
+ std::unordered_multiset<std::string> expected{""};
+ REQUIRE(file_contents == expected);
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ REQUIRE(verifyLogLinePresenceInPollTime(1s, "[warning] Read permission
denied for file"));
+}
+#endif
+
+TEST_CASE_METHOD(FetchFileTestFixture, "Test fetching file with default file
path", "[testFetchFile]") {
+ test_controller_.runSession(plan_);
+ auto file_contents = getSuccessfulFlowFileContents();
+ std::unordered_multiset<std::string> expected{file_content_};
+ REQUIRE(file_contents == expected);
+ REQUIRE(utils::file::FileUtils::exists(input_dir_ +
utils::file::FileUtils::get_separator() + input_file_name_));
+}
+
+TEST_CASE_METHOD(FetchFileTestFixture, "Test fetching file from a custom
path", "[testFetchFile]") {
+ REQUIRE(0 == utils::file::FileUtils::create_dir(input_dir_ +
utils::file::FileUtils::get_separator() + "sub"));
+ utils::putFileToDir(input_dir_ + utils::file::FileUtils::get_separator() +
"sub", input_file_name_, file_content_);
+ auto file_path = input_dir_ + utils::file::FileUtils::get_separator() +
"sub" + utils::file::FileUtils::get_separator() + input_file_name_;
+ plan_->setProperty(fetch_file_processor_,
org::apache::nifi::minifi::processors::FetchFile::FileToFetch.getName(),
file_path);
+ test_controller_.runSession(plan_);
+ auto file_contents = getSuccessfulFlowFileContents();
+ std::unordered_multiset<std::string> expected{file_content_};
+ REQUIRE(file_contents == expected);
+ REQUIRE(utils::file::FileUtils::exists(file_path));
+}
+
+TEST_CASE_METHOD(FetchFileTestFixture, "Flow scheduling fails due to missing
move destination directory when completion strategy is set to move file",
"[testFetchFile]") {
+ plan_->setProperty(fetch_file_processor_,
org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(),
"Move File");
+ REQUIRE_THROWS_AS(test_controller_.runSession(plan_), minifi::Exception);
+}
+
+TEST_CASE_METHOD(FetchFileTestFixture, "Flow fails due to move conflict",
"[testFetchFile]") {
+ auto move_dir = test_controller_.createTempDirectory();
+ utils::putFileToDir(move_dir, input_file_name_, "old content");
+ plan_->setProperty(fetch_file_processor_,
org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(),
"Move File");
+ plan_->setProperty(fetch_file_processor_,
org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory.getName(),
move_dir);
+ plan_->setProperty(fetch_file_processor_,
org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy.getName(),
"Fail");
+ test_controller_.runSession(plan_);
+ auto file_contents = getFailedFlowFileContents();
+ std::unordered_multiset<std::string> expected{""};
+ REQUIRE(file_contents == expected);
+
+ std::ifstream is(move_dir + utils::file::FileUtils::get_separator() +
input_file_name_, std::ifstream::binary);
+ REQUIRE(std::string((std::istreambuf_iterator<char>(is)),
std::istreambuf_iterator<char>()) == "old content");
+ REQUIRE(utils::file::FileUtils::exists(input_dir_ +
utils::file::FileUtils::get_separator() + input_file_name_));
+}
+
+TEST_CASE_METHOD(FetchFileTestFixture, "Move specific properties are ignored
when completion strategy is not move file", "[testFetchFile]") {
+ auto move_dir = test_controller_.createTempDirectory();
+ utils::putFileToDir(move_dir, input_file_name_, "old content");
+ plan_->setProperty(fetch_file_processor_,
org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory.getName(),
move_dir);
+ plan_->setProperty(fetch_file_processor_,
org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy.getName(),
"Fail");
+ test_controller_.runSession(plan_);
+ auto file_contents = getSuccessfulFlowFileContents();
+ std::unordered_multiset<std::string> expected{file_content_};
+ REQUIRE(file_contents == expected);
+}
+
+TEST_CASE_METHOD(FetchFileTestFixture, "Move destination conflict is resolved
with replace file", "[testFetchFile]") {
+ auto move_dir = test_controller_.createTempDirectory();
+ utils::putFileToDir(move_dir, input_file_name_, "old content");
+ plan_->setProperty(fetch_file_processor_,
org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(),
"Move File");
+ plan_->setProperty(fetch_file_processor_,
org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory.getName(),
move_dir);
+ plan_->setProperty(fetch_file_processor_,
org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy.getName(),
"Replace File");
+ test_controller_.runSession(plan_);
+ auto file_contents = getSuccessfulFlowFileContents();
+ std::unordered_multiset<std::string> expected{file_content_};
+ REQUIRE(file_contents == expected);
+ REQUIRE(!utils::file::FileUtils::exists(input_dir_ +
utils::file::FileUtils::get_separator() + input_file_name_));
+
+ std::ifstream is(move_dir + utils::file::FileUtils::get_separator() +
input_file_name_, std::ifstream::binary);
+ REQUIRE(std::string((std::istreambuf_iterator<char>(is)),
std::istreambuf_iterator<char>()) == file_content_);
+}
+
+TEST_CASE_METHOD(FetchFileTestFixture, "Move destination conflict is resolved
with renaming file to a new random filename", "[testFetchFile]") {
+ auto move_dir = test_controller_.createTempDirectory();
+ utils::putFileToDir(move_dir, input_file_name_, "old content");
+ plan_->setProperty(fetch_file_processor_,
org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(),
"Move File");
+ plan_->setProperty(fetch_file_processor_,
org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory.getName(),
move_dir);
+ plan_->setProperty(fetch_file_processor_,
org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy.getName(),
"Rename");
+ test_controller_.runSession(plan_);
+ auto file_contents = getSuccessfulFlowFileContents();
+ std::unordered_multiset<std::string> expected{file_content_};
+ REQUIRE(file_contents == expected);
+ REQUIRE(!utils::file::FileUtils::exists(input_dir_ +
utils::file::FileUtils::get_separator() + input_file_name_));
+
+
+ auto move_dir_contents = getDirContents(move_dir);
+ expected = {"old content", file_content_};
+ REQUIRE(move_dir_contents == expected);
+}
+
+TEST_CASE_METHOD(FetchFileTestFixture, "Move destination conflict is resolved
with deleting the new file and keeping the old one", "[testFetchFile]") {
+ auto move_dir = test_controller_.createTempDirectory();
+ utils::putFileToDir(move_dir, input_file_name_, "old content");
+ plan_->setProperty(fetch_file_processor_,
org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(),
"Move File");
+ plan_->setProperty(fetch_file_processor_,
org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory.getName(),
move_dir);
+ plan_->setProperty(fetch_file_processor_,
org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy.getName(),
"Keep Existing");
+ test_controller_.runSession(plan_);
+ auto file_contents = getSuccessfulFlowFileContents();
+ std::unordered_multiset<std::string> expected{file_content_};
+ REQUIRE(file_contents == expected);
+ REQUIRE(!utils::file::FileUtils::exists(input_dir_ +
utils::file::FileUtils::get_separator() + input_file_name_));
+
+ std::ifstream is(move_dir + utils::file::FileUtils::get_separator() +
input_file_name_, std::ifstream::binary);
+ REQUIRE(std::string((std::istreambuf_iterator<char>(is)),
std::istreambuf_iterator<char>()) == "old content");
+}
+
+TEST_CASE_METHOD(FetchFileTestFixture, "Fetched file is moved to a new
directory after flow completion", "[testFetchFile]") {
+ auto move_dir = test_controller_.createTempDirectory();
+ plan_->setProperty(fetch_file_processor_,
org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(),
"Move File");
+ plan_->setProperty(fetch_file_processor_,
org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory.getName(),
move_dir);
+ test_controller_.runSession(plan_);
+ auto file_contents = getSuccessfulFlowFileContents();
+ std::unordered_multiset<std::string> expected{file_content_};
+ REQUIRE(file_contents == expected);
+ REQUIRE(!utils::file::FileUtils::exists(input_dir_ +
utils::file::FileUtils::get_separator() + input_file_name_));
+
+ std::ifstream is(move_dir + utils::file::FileUtils::get_separator() +
input_file_name_, std::ifstream::binary);
+ REQUIRE(std::string((std::istreambuf_iterator<char>(is)),
std::istreambuf_iterator<char>()) == file_content_);
+}
+
+TEST_CASE_METHOD(FetchFileTestFixture, "After flow completion the fetched file
is moved to a non-existent directory which is created by the flow",
"[testFetchFile]") {
+ auto move_dir = test_controller_.createTempDirectory();
+ move_dir = move_dir + utils::file::FileUtils::get_separator() + "temp";
+ plan_->setProperty(fetch_file_processor_,
org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(),
"Move File");
+ plan_->setProperty(fetch_file_processor_,
org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory.getName(),
move_dir);
+ test_controller_.runSession(plan_);
+ auto file_contents = getSuccessfulFlowFileContents();
+ std::unordered_multiset<std::string> expected{file_content_};
+ REQUIRE(file_contents == expected);
+ REQUIRE(!utils::file::FileUtils::exists(input_dir_ +
utils::file::FileUtils::get_separator() + input_file_name_));
+
+ std::ifstream is(move_dir + utils::file::FileUtils::get_separator() +
input_file_name_, std::ifstream::binary);
+ REQUIRE(std::string((std::istreambuf_iterator<char>(is)),
std::istreambuf_iterator<char>()) == file_content_);
+}
+
+#ifndef WIN32
+TEST_CASE_METHOD(FetchFileTestFixture, "Move completion strategy failure due
to filesystem error still succeeds flow", "[testFetchFile]") {
+ auto move_dir = test_controller_.createTempDirectory();
+ utils::file::FileUtils::set_permissions(move_dir, 0);
+ plan_->setProperty(fetch_file_processor_,
org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(),
"Move File");
+ plan_->setProperty(fetch_file_processor_,
org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory.getName(),
move_dir);
+ test_controller_.runSession(plan_);
+ auto file_contents = getSuccessfulFlowFileContents();
+ std::unordered_multiset<std::string> expected{file_content_};
+ REQUIRE(file_contents == expected);
+ REQUIRE(utils::file::FileUtils::exists(input_dir_ +
utils::file::FileUtils::get_separator() + input_file_name_));
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ REQUIRE(verifyLogLinePresenceInPollTime(1s, "completion strategy failed"));
+ utils::file::FileUtils::set_permissions(move_dir, 0644);
+}
+#endif
+
+TEST_CASE_METHOD(FetchFileTestFixture, "Fetched file is deleted after flow
completion", "[testFetchFile]") {
+ plan_->setProperty(fetch_file_processor_,
org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(),
"Delete File");
+ test_controller_.runSession(plan_);
+ auto file_contents = getSuccessfulFlowFileContents();
+ std::unordered_multiset<std::string> expected{file_content_};
+ REQUIRE(file_contents == expected);
+ REQUIRE(!utils::file::FileUtils::exists(input_dir_ +
utils::file::FileUtils::get_separator() + input_file_name_));
+}
+
+} // namespace
diff --git a/libminifi/include/utils/FileReaderCallback.h
b/libminifi/include/utils/FileReaderCallback.h
index 398bb14..a368222 100644
--- a/libminifi/include/utils/FileReaderCallback.h
+++ b/libminifi/include/utils/FileReaderCallback.h
@@ -45,7 +45,8 @@ class FileReaderCallback : public OutputStreamCallback {
class FileReaderCallbackIOError : public std::runtime_error {
public:
- explicit FileReaderCallbackIOError(const std::string& message) :
std::runtime_error{message} {}
+ explicit FileReaderCallbackIOError(const std::string& message, int code) :
std::runtime_error{message}, error_code(code) {}
+ int error_code;
};
} // namespace utils
diff --git a/libminifi/src/utils/FileReaderCallback.cpp
b/libminifi/src/utils/FileReaderCallback.cpp
index e656537..ee59586 100644
--- a/libminifi/src/utils/FileReaderCallback.cpp
+++ b/libminifi/src/utils/FileReaderCallback.cpp
@@ -40,7 +40,7 @@ FileReaderCallback::FileReaderCallback(const std::string&
file_name)
logger_->log_debug("Opening %s", file_name);
input_stream_.open(file_name.c_str(), std::fstream::in |
std::fstream::binary);
if (!input_stream_.is_open()) {
- throw FileReaderCallbackIOError(StringUtils::join_pack("Error opening
file: ", std::strerror(errno)));
+ throw FileReaderCallbackIOError(StringUtils::join_pack("Error opening
file: ", std::strerror(errno)), errno);
}
}
@@ -51,7 +51,7 @@ int64_t FileReaderCallback::process(const
std::shared_ptr<io::BaseStream>& outpu
while (input_stream_.good()) {
input_stream_.read(buffer.data(), buffer.size());
if (input_stream_.bad()) {
- throw FileReaderCallbackIOError(StringUtils::join_pack("Error reading
file: ", std::strerror(errno)));
+ throw FileReaderCallbackIOError(StringUtils::join_pack("Error reading
file: ", std::strerror(errno)), errno);
}
const auto num_bytes_read = input_stream_.gcount();
logger_->log_trace("Read %jd bytes of input",
std::intmax_t{num_bytes_read});