This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 1a8ae2f30 MINIFICPP-2291 Fix the site-to-site transfer or large files
1a8ae2f30 is described below
commit 1a8ae2f30fd6465cfa5dce062b70fbc846ff8940
Author: Ferenc Gerlits <[email protected]>
AuthorDate: Thu Jan 25 16:37:52 2024 +0100
MINIFICPP-2291 Fix the site-to-site transfer or large files
Previously, the site-to-site client treated the
TRANSACTION_FINISHED_BUT_DESTINATION_FULL
message from the server as a failure, and kept rolling back and retrying
the flow file.
Now, we treat it as success but add a yield so the server has time to clear
the incoming
connection, the same way as NiFi does.
Closes #1720
Signed-off-by: Marton Szasz <[email protected]>
---
docker/test/integration/features/s2s.feature | 15 +++++++++++++++
.../flow_serialization/Nifi_flow_json_serializer.py | 4 ++--
extensions/standard-processors/processors/GetFile.cpp | 2 +-
libminifi/include/sitetosite/SiteToSiteClient.h | 2 +-
libminifi/src/core/FlowConfiguration.cpp | 2 +-
libminifi/src/sitetosite/RawSocketProtocol.cpp | 2 +-
libminifi/src/sitetosite/SiteToSiteClient.cpp | 15 ++++++++++-----
7 files changed, 31 insertions(+), 11 deletions(-)
diff --git a/docker/test/integration/features/s2s.feature
b/docker/test/integration/features/s2s.feature
index 6b5f2e6f0..c236f2ac7 100644
--- a/docker/test/integration/features/s2s.feature
+++ b/docker/test/integration/features/s2s.feature
@@ -34,6 +34,21 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S
protocol
When both instances start up
Then a flowfile with the content "test" is placed in the monitored
directory in less than 90 seconds
+ And the Minifi logs do not contain the following message: "ProcessSession
rollback" after 1 seconds
+
+ Scenario: A MiNiFi instance produces and transfers a large data file to a
NiFi instance via s2s
+ Given a GetFile processor with the "Input Directory" property set to
"/tmp/input"
+ And a file with the content "this is a very long file we want to send by
site-to-site" is present in "/tmp/input"
+ And a RemoteProcessGroup node opened on
"http://nifi-${feature_id}:8080/nifi"
+ And the "success" relationship of the GetFile processor is connected to
the input port on the RemoteProcessGroup
+
+ And a NiFi flow receiving data from a RemoteProcessGroup "from-minifi" on
port 8080
+ And a PutFile processor with the "Directory" property set to "/tmp/output"
in the "nifi" flow
+ And the "success" relationship of the from-minifi is connected to the
PutFile
+
+ When both instances start up
+ Then a flowfile with the content "this is a very long file we want to send
by site-to-site" is placed in the monitored directory in less than 90 seconds
+ And the Minifi logs do not contain the following message: "ProcessSession
rollback" after 1 seconds
Scenario: Zero length files are transfered between via s2s if the "drop
empty" connection property is false
Given a MiNiFi CPP server with yaml config
diff --git
a/docker/test/integration/minifi/flow_serialization/Nifi_flow_json_serializer.py
b/docker/test/integration/minifi/flow_serialization/Nifi_flow_json_serializer.py
index 2c24f2d36..fa3f36ac6 100644
---
a/docker/test/integration/minifi/flow_serialization/Nifi_flow_json_serializer.py
+++
b/docker/test/integration/minifi/flow_serialization/Nifi_flow_json_serializer.py
@@ -210,8 +210,8 @@ class Nifi_flow_json_serializer:
"labelIndex": 1,
"zIndex": 0,
"selectedRelationships": [conn_name] if not
isinstance(connectable, InputPort) else [""],
- "backPressureObjectThreshold": 10000,
- "backPressureDataSizeThreshold": "1 GB",
+ "backPressureObjectThreshold": 10,
+ "backPressureDataSizeThreshold": "50 B",
"flowFileExpiration": "0 sec",
"prioritizers": [],
"bends": [],
diff --git a/extensions/standard-processors/processors/GetFile.cpp
b/extensions/standard-processors/processors/GetFile.cpp
index b7c5e174b..318996730 100644
--- a/extensions/standard-processors/processors/GetFile.cpp
+++ b/extensions/standard-processors/processors/GetFile.cpp
@@ -78,7 +78,7 @@ void GetFile::onSchedule(core::ProcessContext& context,
core::ProcessSessionFact
if (auto directory_str = context.getProperty(Directory)) {
if (!utils::file::is_directory(*directory_str)) {
- throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Input Directory \"" + value
+ "\" is not a directory");
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION,
utils::string::join_pack("Input Directory \"", *directory_str, "\" is not a
directory"));
}
request_.inputDirectory = *directory_str;
} else {
diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h
b/libminifi/include/sitetosite/SiteToSiteClient.h
index a39e4d008..53f4b2f29 100644
--- a/libminifi/include/sitetosite/SiteToSiteClient.h
+++ b/libminifi/include/sitetosite/SiteToSiteClient.h
@@ -201,7 +201,7 @@ class SiteToSiteClient : public core::Connectable {
// Cancel the transaction
virtual void cancel(const utils::Identifier &transactionID);
// Complete the transaction
- virtual bool complete(const utils::Identifier &transactionID);
+ virtual bool complete(core::ProcessContext& context, const utils::Identifier
&transactionID);
// Error the transaction
virtual void error(const utils::Identifier &transactionID);
diff --git a/libminifi/src/core/FlowConfiguration.cpp
b/libminifi/src/core/FlowConfiguration.cpp
index f955372b0..81729a1fb 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -143,7 +143,7 @@ bool FlowConfiguration::persist(const std::string&
serialized_flow) {
}
const bool status = filesystem_->write(*config_path_, serialized_flow);
- logger_->log_info("Result of updating the config file {}: {}", config_path_,
status ? "success" : "failure");
+ logger_->log_info("Result of updating the config file {}: {}",
*config_path_, status ? "success" : "failure");
checksum_calculator_.invalidateChecksum();
return status;
}
diff --git a/libminifi/src/sitetosite/RawSocketProtocol.cpp
b/libminifi/src/sitetosite/RawSocketProtocol.cpp
index 040029888..1a28cfc35 100644
--- a/libminifi/src/sitetosite/RawSocketProtocol.cpp
+++ b/libminifi/src/sitetosite/RawSocketProtocol.cpp
@@ -595,7 +595,7 @@ bool
RawSiteToSiteClient::transmitPayload(core::ProcessContext& context, core::P
if (!confirm(transactionID)) {
throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed in transaction " +
transactionID.to_string());
}
- if (!complete(transactionID)) {
+ if (!complete(context, transactionID)) {
throw Exception(SITE2SITE_EXCEPTION, "Complete Failed in transaction " +
transactionID.to_string());
}
logger_->log_info("Site2Site transaction {} successfully send flow record
{} content bytes {}", transactionID.to_string(),
transaction->current_transfers_, transaction->_bytes);
diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp
b/libminifi/src/sitetosite/SiteToSiteClient.cpp
index 429ac70f5..725e200f3 100644
--- a/libminifi/src/sitetosite/SiteToSiteClient.cpp
+++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp
@@ -164,7 +164,7 @@ bool
SiteToSiteClient::transferFlowFiles(core::ProcessContext& context, core::Pr
if (!confirm(transactionID)) {
throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed for " +
transactionID.to_string());
}
- if (!complete(transactionID)) {
+ if (!complete(context, transactionID)) {
throw Exception(SITE2SITE_EXCEPTION, "Complete Failed for " +
transactionID.to_string());
}
logger_->log_debug("Site2Site transaction {} successfully sent flow record
{}, content bytes {}", transactionID.to_string(),
transaction->total_transfers_, transaction->_bytes);
@@ -336,7 +336,7 @@ void SiteToSiteClient::error(const utils::Identifier&
transactionID) {
}
// Complete the transaction
-bool SiteToSiteClient::complete(const utils::Identifier& transactionID) {
+bool SiteToSiteClient::complete(core::ProcessContext& context, const
utils::Identifier& transactionID) {
int ret = 0;
std::shared_ptr<Transaction> transaction = nullptr;
@@ -382,12 +382,17 @@ bool SiteToSiteClient::complete(const utils::Identifier&
transactionID) {
if (ret <= 0)
return false;
- if (code == TRANSACTION_FINISHED) {
+ if (code == TRANSACTION_FINISHED || code ==
TRANSACTION_FINISHED_BUT_DESTINATION_FULL) {
logger_->log_info("Site2Site transaction {} peer finished transaction",
transactionID.to_string());
transaction->_state = TRANSACTION_COMPLETED;
+
+ if (code == TRANSACTION_FINISHED_BUT_DESTINATION_FULL) {
+ logger_->log_info("Site2Site transaction {} reported destination full,
yielding", transactionID.to_string());
+ context.yield();
+ }
return true;
} else {
- logger_->log_warn("Site2Site transaction {} peer unknown respond code
{}", transactionID.to_string(), magic_enum::enum_underlying(code));
+ logger_->log_warn("Site2Site transaction {} peer unexpected respond code
{}: {}", transactionID.to_string(), magic_enum::enum_underlying(code),
magic_enum::enum_name(code));
return false;
}
}
@@ -718,7 +723,7 @@ bool
SiteToSiteClient::receiveFlowFiles(core::ProcessContext& context, core::Pro
if (transfers > 0 && !confirm(transactionID)) {
throw Exception(SITE2SITE_EXCEPTION, "Confirm Transaction Failed");
}
- if (!complete(transactionID)) {
+ if (!complete(context, transactionID)) {
std::stringstream transaction_str;
transaction_str << "Complete Transaction " << transactionID.to_string()
<< " Failed";
throw Exception(SITE2SITE_EXCEPTION, transaction_str.str());