This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new b723bda MINIFICPP-1412 - Create ResouceClaim if processor fails to do
so
b723bda is described below
commit b723bdaec323deed340312f953c64260d405fda8
Author: Adam Debreceni <[email protected]>
AuthorDate: Thu Nov 26 10:58:45 2020 +0100
MINIFICPP-1412 - Create ResouceClaim if processor fails to do so
Signed-off-by: Arpad Boda <[email protected]>
This closes #944
---
.../processors/GenerateFlowFile.cpp | 16 ++++++++--------
.../standard-processors/tests/unit/ProcessorTests.cpp | 4 +++-
libminifi/include/core/ProcessSession.h | 4 ++++
libminifi/src/core/ProcessSession.cpp | 18 ++++++++++++++++++
4 files changed, 33 insertions(+), 9 deletions(-)
diff --git a/extensions/standard-processors/processors/GenerateFlowFile.cpp
b/extensions/standard-processors/processors/GenerateFlowFile.cpp
index 08abbbd..3008e23 100644
--- a/extensions/standard-processors/processors/GenerateFlowFile.cpp
+++ b/extensions/standard-processors/processors/GenerateFlowFile.cpp
@@ -124,16 +124,16 @@ void GenerateFlowFile::onTrigger(core::ProcessContext
*context, core::ProcessSes
logger_->log_error("Failed to create flowfile!");
return;
}
- if (fileSize_ > 0) {
- if (uniqueFlowFile_) {
- std::vector<char> data(gsl::narrow<size_t>(fileSize_));
+ if (uniqueFlowFile_) {
+ std::vector<char> data(gsl::narrow<size_t>(fileSize_));
+ if (fileSize_ > 0) {
generateData(data, textData_);
- GenerateFlowFile::WriteCallback callback(std::move(data));
- session->write(flowFile, &callback);
- } else {
- GenerateFlowFile::WriteCallback callback(data_);
- session->write(flowFile, &callback);
}
+ GenerateFlowFile::WriteCallback callback(std::move(data));
+ session->write(flowFile, &callback);
+ } else {
+ GenerateFlowFile::WriteCallback callback(data_);
+ session->write(flowFile, &callback);
}
session->transfer(flowFile, Success);
}
diff --git a/extensions/standard-processors/tests/unit/ProcessorTests.cpp
b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
index 9fefd92..5384459 100644
--- a/extensions/standard-processors/tests/unit/ProcessorTests.cpp
+++ b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
@@ -41,6 +41,7 @@
#include "core/ProcessorNode.h"
#include "core/reporting/SiteToSiteProvenanceReportingTask.h"
#include "utils/PropertyErrors.h"
+#include "utils/IntegrationTestUtils.h"
TEST_CASE("Test Creation of GetFile", "[getfileCreate]") {
TestController testController;
@@ -235,6 +236,7 @@ TEST_CASE("TestConnectionFull", "[ConnectionFull]") {
TestController testController;
LogTestController::getInstance().setDebug<minifi::processors::GenerateFlowFile>();
std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::VolatileContentRepository>();
+ content_repo->initialize(std::make_shared<minifi::Configure>());
std::shared_ptr<core::Processor> processor =
std::make_shared<org::apache::nifi::minifi::processors::GenerateFlowFile>("GFF");
processor->initialize();
processor->setProperty(processors::GenerateFlowFile::BatchSize, "10");
@@ -539,7 +541,7 @@ TEST_CASE("TestEmptyContent", "[emptyContent]") {
plan->runNextProcessor();
- // segfault
+ REQUIRE(utils::verifyLogLinePresenceInPollTime(std::chrono::seconds{0}, "did
not create a ResourceClaim"));
LogTestController::getInstance().reset();
}
diff --git a/libminifi/include/core/ProcessSession.h
b/libminifi/include/core/ProcessSession.h
index 60ddafc..743adf8 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -170,6 +170,10 @@ class ProcessSession : public ReferenceContainer {
void persistFlowFilesBeforeTransfer(
std::map<std::shared_ptr<Connectable>,
std::vector<std::shared_ptr<core::FlowFile>>>& transactionMap,
const std::map<utils::Identifier, FlowFileUpdate>& modifiedFlowFiles);
+
+ void ensureNonNullResourceClaim(
+ const std::map<std::shared_ptr<Connectable>,
std::vector<std::shared_ptr<core::FlowFile>>>& transactionMap);
+
// Clone the flow file during transfer to multiple connections for a
relationship
std::shared_ptr<core::FlowFile> cloneDuringTransfer(const
std::shared_ptr<core::FlowFile> &parent);
// ProcessContext
diff --git a/libminifi/src/core/ProcessSession.cpp
b/libminifi/src/core/ProcessSession.cpp
index 6a8273a..7be0feb 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -744,6 +744,8 @@ void ProcessSession::commit() {
}
}
+ ensureNonNullResourceClaim(connectionQueues);
+
content_session_->commit();
persistFlowFilesBeforeTransfer(connectionQueues, _updatedFlowFiles);
@@ -891,6 +893,22 @@ void ProcessSession::persistFlowFilesBeforeTransfer(
}
}
+void ProcessSession::ensureNonNullResourceClaim(
+ const std::map<std::shared_ptr<Connectable>,
std::vector<std::shared_ptr<core::FlowFile>>> &transactionMap) {
+ for (auto& transaction : transactionMap) {
+ for (auto& flowFile : transaction.second) {
+ auto claim = flowFile->getResourceClaim();
+ if (!claim) {
+ logger_->log_debug("Processor %s (%s) did not create a ResourceClaim,
creating an empty one",
+ process_context_->getProcessorNode()->getUUIDStr(),
+ process_context_->getProcessorNode()->getName());
+ OutputStreamPipe
emptyStreamCallback(std::make_shared<io::BufferStream>());
+ write(flowFile, &emptyStreamCallback);
+ }
+ }
+ }
+}
+
std::shared_ptr<core::FlowFile> ProcessSession::get() {
std::shared_ptr<Connectable> first =
process_context_->getProcessorNode()->pickIncomingConnection();