This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new b723bda  MINIFICPP-1412 - Create ResouceClaim if processor fails to do 
so
b723bda is described below

commit b723bdaec323deed340312f953c64260d405fda8
Author: Adam Debreceni <[email protected]>
AuthorDate: Thu Nov 26 10:58:45 2020 +0100

    MINIFICPP-1412 - Create ResouceClaim if processor fails to do so
    
    Signed-off-by: Arpad Boda <[email protected]>
    
    This closes #944
---
 .../processors/GenerateFlowFile.cpp                    | 16 ++++++++--------
 .../standard-processors/tests/unit/ProcessorTests.cpp  |  4 +++-
 libminifi/include/core/ProcessSession.h                |  4 ++++
 libminifi/src/core/ProcessSession.cpp                  | 18 ++++++++++++++++++
 4 files changed, 33 insertions(+), 9 deletions(-)

diff --git a/extensions/standard-processors/processors/GenerateFlowFile.cpp 
b/extensions/standard-processors/processors/GenerateFlowFile.cpp
index 08abbbd..3008e23 100644
--- a/extensions/standard-processors/processors/GenerateFlowFile.cpp
+++ b/extensions/standard-processors/processors/GenerateFlowFile.cpp
@@ -124,16 +124,16 @@ void GenerateFlowFile::onTrigger(core::ProcessContext 
*context, core::ProcessSes
       logger_->log_error("Failed to create flowfile!");
       return;
     }
-    if (fileSize_ > 0) {
-      if (uniqueFlowFile_) {
-        std::vector<char> data(gsl::narrow<size_t>(fileSize_));
+    if (uniqueFlowFile_) {
+      std::vector<char> data(gsl::narrow<size_t>(fileSize_));
+      if (fileSize_ > 0) {
         generateData(data, textData_);
-        GenerateFlowFile::WriteCallback callback(std::move(data));
-        session->write(flowFile, &callback);
-      } else {
-        GenerateFlowFile::WriteCallback callback(data_);
-        session->write(flowFile, &callback);
       }
+      GenerateFlowFile::WriteCallback callback(std::move(data));
+      session->write(flowFile, &callback);
+    } else {
+      GenerateFlowFile::WriteCallback callback(data_);
+      session->write(flowFile, &callback);
     }
     session->transfer(flowFile, Success);
   }
diff --git a/extensions/standard-processors/tests/unit/ProcessorTests.cpp 
b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
index 9fefd92..5384459 100644
--- a/extensions/standard-processors/tests/unit/ProcessorTests.cpp
+++ b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
@@ -41,6 +41,7 @@
 #include "core/ProcessorNode.h"
 #include "core/reporting/SiteToSiteProvenanceReportingTask.h"
 #include "utils/PropertyErrors.h"
+#include "utils/IntegrationTestUtils.h"
 
 TEST_CASE("Test Creation of GetFile", "[getfileCreate]") {
   TestController testController;
@@ -235,6 +236,7 @@ TEST_CASE("TestConnectionFull", "[ConnectionFull]") {
   TestController testController;
   
LogTestController::getInstance().setDebug<minifi::processors::GenerateFlowFile>();
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
+  content_repo->initialize(std::make_shared<minifi::Configure>());
   std::shared_ptr<core::Processor> processor = 
std::make_shared<org::apache::nifi::minifi::processors::GenerateFlowFile>("GFF");
   processor->initialize();
   processor->setProperty(processors::GenerateFlowFile::BatchSize, "10");
@@ -539,7 +541,7 @@ TEST_CASE("TestEmptyContent", "[emptyContent]") {
 
   plan->runNextProcessor();
 
-  // segfault
+  REQUIRE(utils::verifyLogLinePresenceInPollTime(std::chrono::seconds{0}, "did 
not create a ResourceClaim"));
 
   LogTestController::getInstance().reset();
 }
diff --git a/libminifi/include/core/ProcessSession.h 
b/libminifi/include/core/ProcessSession.h
index 60ddafc..743adf8 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -170,6 +170,10 @@ class ProcessSession : public ReferenceContainer {
   void persistFlowFilesBeforeTransfer(
       std::map<std::shared_ptr<Connectable>, 
std::vector<std::shared_ptr<core::FlowFile>>>& transactionMap,
       const std::map<utils::Identifier, FlowFileUpdate>& modifiedFlowFiles);
+
+  void ensureNonNullResourceClaim(
+      const std::map<std::shared_ptr<Connectable>, 
std::vector<std::shared_ptr<core::FlowFile>>>& transactionMap);
+
   // Clone the flow file during transfer to multiple connections for a 
relationship
   std::shared_ptr<core::FlowFile> cloneDuringTransfer(const 
std::shared_ptr<core::FlowFile> &parent);
   // ProcessContext
diff --git a/libminifi/src/core/ProcessSession.cpp 
b/libminifi/src/core/ProcessSession.cpp
index 6a8273a..7be0feb 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -744,6 +744,8 @@ void ProcessSession::commit() {
         }
     }
 
+    ensureNonNullResourceClaim(connectionQueues);
+
     content_session_->commit();
 
     persistFlowFilesBeforeTransfer(connectionQueues, _updatedFlowFiles);
@@ -891,6 +893,22 @@ void ProcessSession::persistFlowFilesBeforeTransfer(
   }
 }
 
+void ProcessSession::ensureNonNullResourceClaim(
+    const std::map<std::shared_ptr<Connectable>, 
std::vector<std::shared_ptr<core::FlowFile>>> &transactionMap) {
+  for (auto& transaction : transactionMap) {
+    for (auto& flowFile : transaction.second) {
+      auto claim = flowFile->getResourceClaim();
+      if (!claim) {
+        logger_->log_debug("Processor %s (%s) did not create a ResourceClaim, 
creating an empty one",
+                           process_context_->getProcessorNode()->getUUIDStr(),
+                           process_context_->getProcessorNode()->getName());
+        OutputStreamPipe 
emptyStreamCallback(std::make_shared<io::BufferStream>());
+        write(flowFile, &emptyStreamCallback);
+      }
+    }
+  }
+}
+
 std::shared_ptr<core::FlowFile> ProcessSession::get() {
   std::shared_ptr<Connectable> first = 
process_context_->getProcessorNode()->pickIncomingConnection();
 

Reply via email to