This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit d15cc2546ebadb7c929fc58949e41964345ef58f
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Mon Jul 24 11:49:50 2023 +0200

    MINIFICPP-2173 Fix MergeContent error handling issues
    
    - Remove intermediate individual `ProcessSession` for every processed bin
    - Rollback flow files of a merge if the merge fails with an exception and 
put them back to the ready bins
    - Remove `merge_flow` from session in case the merge fails to avoid 
throwing an exception for not finding the transfer relationship
    - Do not yield and continue processing of the bins in case putting one of 
the flow files in a bin fails
    - Log stream size and offset values in case of a `StreamSlice` creation 
failure
    - Remove unused function parameters and use references instead of raw 
pointers
    
    Signed-off-by: Ferenc Gerlits <[email protected]>
    This closes #1616
---
 extensions/libarchive/BinFiles.cpp      | 127 +++++++++++++++++---------------
 extensions/libarchive/BinFiles.h        |  37 +++-------
 extensions/libarchive/MergeContent.cpp  |  46 ++++++------
 extensions/libarchive/MergeContent.h    |  25 +++++--
 libminifi/include/core/ProcessSession.h |   2 +
 libminifi/src/core/ProcessSession.cpp   |   5 ++
 libminifi/src/io/StreamSlice.cpp        |   2 +-
 libminifi/test/unit/StreamTests.cpp     |   4 +-
 8 files changed, 133 insertions(+), 115 deletions(-)

diff --git a/extensions/libarchive/BinFiles.cpp 
b/extensions/libarchive/BinFiles.cpp
index 498ee4b3d..1b0f61e1d 100644
--- a/extensions/libarchive/BinFiles.cpp
+++ b/extensions/libarchive/BinFiles.cpp
@@ -80,7 +80,7 @@ void BinFiles::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFac
   }
 }
 
-void BinFiles::preprocessFlowFile(core::ProcessContext* /*context*/, 
core::ProcessSession* /*session*/, const std::shared_ptr<core::FlowFile>& flow) 
{
+void BinFiles::preprocessFlowFile(const std::shared_ptr<core::FlowFile>& flow) 
{
   // handle backward compatibility with old segment attributes
   std::string value;
   if (!flow->getAttribute(BinFiles::FRAGMENT_COUNT_ATTRIBUTE, value) && 
flow->getAttribute(BinFiles::SEGMENT_COUNT_ATTRIBUTE, value)) {
@@ -156,6 +156,11 @@ void 
BinManager::getReadyBin(std::deque<std::unique_ptr<Bin>> &retBins) {
   }
 }
 
+void BinManager::addReadyBin(std::unique_ptr<Bin> ready_bin) {
+  std::lock_guard<std::mutex> lock(mutex_);
+  readyBin_.push_back(std::move(ready_bin));
+}
+
 bool BinManager::offer(const std::string &group, const 
std::shared_ptr<core::FlowFile>& flow) {
   std::lock_guard < std::mutex > lock(mutex_);
   if (flow->getSize() > maxSize_) {
@@ -203,93 +208,97 @@ bool BinManager::offer(const std::string &group, const 
std::shared_ptr<core::Flo
   return true;
 }
 
-void BinFiles::onTrigger(const std::shared_ptr<core::ProcessContext> &context, 
const std::shared_ptr<core::ProcessSession> &session) {
-  // Rollback is not viable for this processor!!
-  {
-    // process resurrected FlowFiles first
-    auto flowFiles = file_store_.getNewFlowFiles();
-    // these are already processed FlowFiles, that we own
-    bool hadFailure = false;
-    for (auto &file : flowFiles) {
-      std::string groupId = getGroupId(context.get(), file);
-      bool offer = this->binManager_.offer(groupId, file);
-      if (!offer) {
-        session->transfer(file, Failure);
-        hadFailure = true;
-      } else {
-        // no need to route successfully captured such files as we already own 
them
-      }
-    }
-    if (hadFailure) {
-      context->yield();
-      return;
+bool BinFiles::resurrectFlowFiles(core::ProcessSession &session) {
+  auto flow_files = file_store_.getNewFlowFiles();
+  // these are already processed FlowFiles, that we own
+  bool had_failure = false;
+  for (auto &file : flow_files) {
+    std::string group_id = getGroupId(file);
+    if (!binManager_.offer(group_id, file)) {
+      session.transfer(file, Failure);
+      had_failure = true;
     }
+    // no need to route successfully captured such files as we already own 
them in the Self relationship
   }
+  return had_failure;
+}
 
+void BinFiles::assumeOwnershipOfNextBatch(core::ProcessSession &session) {
   for (size_t i = 0; i < batchSize_; ++i) {
-    auto flow = session->get();
+    auto flow = session.get();
 
     if (flow == nullptr) {
       break;
     }
 
-    preprocessFlowFile(context.get(), session.get(), flow);
-    std::string groupId = getGroupId(context.get(), flow);
+    preprocessFlowFile(flow);
+    std::string group_id = getGroupId(flow);
 
-    bool offer = this->binManager_.offer(groupId, flow);
+    bool offer = binManager_.offer(group_id, flow);
     if (!offer) {
-      session->transfer(flow, Failure);
-      context->yield();
-      return;
+      session.transfer(flow, Failure);
+      continue;
+    }
+    session.transfer(flow, Self);
+  }
+  session.commit();
+}
+
+void BinFiles::processReadyBins(std::deque<std::unique_ptr<Bin>> ready_bins, 
core::ProcessSession &session) {
+  while (!ready_bins.empty()) {
+    std::unique_ptr<Bin> bin = std::move(ready_bins.front());
+    ready_bins.pop_front();
+
+    try {
+      addFlowsToSession(session, bin);
+      logger_->log_debug("BinFiles start to process bin %s for group %s", 
bin->getUUIDStr(), bin->getGroupId());
+      if (!processBin(session, bin))
+        transferFlowsToFail(session, bin);
+      session.commit();
+    } catch(const std::exception& ex) {
+      logger_->log_error("Caught Exception type: '%s' while merging ready bin: 
'%s'", typeid(ex).name(), ex.what());
+      binManager_.addReadyBin(std::move(bin));
+      session.rollback();
     }
-    // assuming ownership over the incoming flowFile
-    session->transfer(flow, Self);
   }
+}
 
-  // migrate bin to ready bin
-  this->binManager_.gatherReadyBins();
-  if (gsl::narrow<uint32_t>(this->binManager_.getBinCount()) > maxBinCount_) {
+std::deque<std::unique_ptr<Bin>> 
BinFiles::gatherReadyBins(core::ProcessContext &context) {
+  binManager_.gatherReadyBins();
+  if (gsl::narrow<uint32_t>(binManager_.getBinCount()) > maxBinCount_) {
     // bin count reach max allowed
-    context->yield();
-    logger_->log_debug("BinFiles reach max bin count %d", 
this->binManager_.getBinCount());
-    this->binManager_.removeOldestBin();
+    context.yield();
+    logger_->log_debug("BinFiles reach max bin count %d", 
binManager_.getBinCount());
+    binManager_.removeOldestBin();
   }
 
-  // get the ready bin
-  std::deque<std::unique_ptr<Bin>> readyBins;
-  binManager_.getReadyBin(readyBins);
+  std::deque<std::unique_ptr<Bin>> ready_bins;
+  binManager_.getReadyBin(ready_bins);
+  return ready_bins;
+}
 
-  // process the ready bin
-  while (!readyBins.empty()) {
-    // create session for merge
-    // we have to create a new session
-    // for each merge as a rollback erases all
-    // previously added files
-    core::ProcessSession mergeSession(context);
-    mergeSession.setMetrics(metrics_);
-    std::unique_ptr<Bin> bin = std::move(readyBins.front());
-    readyBins.pop_front();
-    // add bin's flows to the session
-    this->addFlowsToSession(context.get(), &mergeSession, bin);
-    logger_->log_debug("BinFiles start to process bin %s for group %s", 
bin->getUUIDStr(), bin->getGroupId());
-    if (!this->processBin(context.get(), &mergeSession, bin))
-      this->transferFlowsToFail(context.get(), &mergeSession, bin);
-    mergeSession.commit();
+void BinFiles::onTrigger(const std::shared_ptr<core::ProcessContext> &context, 
const std::shared_ptr<core::ProcessSession> &session) {
+  if (resurrectFlowFiles(*session)) {
+    context->yield();
+    return;
   }
+
+  assumeOwnershipOfNextBatch(*session);
+  processReadyBins(gatherReadyBins(*context), *session);
 }
 
-void BinFiles::transferFlowsToFail(core::ProcessContext* /*context*/, 
core::ProcessSession *session, std::unique_ptr<Bin> &bin) {
+void BinFiles::transferFlowsToFail(core::ProcessSession &session, 
std::unique_ptr<Bin> &bin) {
   std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile();
   for (const auto& flow : flows) {
-    session->transfer(flow, Failure);
+    session.transfer(flow, Failure);
   }
   flows.clear();
 }
 
-void BinFiles::addFlowsToSession(core::ProcessContext* /*context*/, 
core::ProcessSession *session, std::unique_ptr<Bin> &bin) {
+void BinFiles::addFlowsToSession(core::ProcessSession &session, 
std::unique_ptr<Bin> &bin) {
   std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile();
   for (const auto& flow : flows) {
-    session->add(flow);
+    session.add(flow);
   }
 }
 
diff --git a/extensions/libarchive/BinFiles.h b/extensions/libarchive/BinFiles.h
index d32025268..db549f680 100644
--- a/extensions/libarchive/BinFiles.h
+++ b/extensions/libarchive/BinFiles.h
@@ -40,13 +40,8 @@
 
 namespace org::apache::nifi::minifi::processors {
 
-// Bin Class
 class Bin {
  public:
-  // Constructor
-  /*!
-   * Create a new Bin. Note: this object is not thread safe
-   */
   explicit Bin(const uint64_t &minSize, const uint64_t &maxSize, const size_t 
&minEntries, const size_t & maxEntries, std::string fileCount, std::string 
groupId)
       : minSize_(minSize),
         maxSize_(maxSize),
@@ -62,7 +57,6 @@ class Bin {
   virtual ~Bin() {
     logger_->log_debug("Bin %s for group %s destroyed", getUUIDStr(), 
groupId_);
   }
-  // check whether the bin is full
   [[nodiscard]] bool isFull() const {
     return queued_data_size_ >= maxSize_ || queue_.size() >= maxEntries_;
   }
@@ -70,14 +64,12 @@ class Bin {
   [[nodiscard]] bool isReadyForMerge() const {
     return closed_ || isFull() || (queued_data_size_ >= minSize_ && 
queue_.size() >= minEntries_);
   }
-  // check whether the bin is older than the time specified in msec
   [[nodiscard]] bool isOlderThan(const std::chrono::milliseconds duration) 
const {
     return std::chrono::system_clock::now() > (creation_dated_ + duration);
   }
   std::deque<std::shared_ptr<core::FlowFile>>& getFlowFile() {
     return queue_;
   }
-  // offer the flowfile to the bin
   bool offer(const std::shared_ptr<core::FlowFile>& flow) {
     if (!fileCount_.empty()) {
       std::string value;
@@ -103,18 +95,15 @@ class Bin {
 
     return true;
   }
-  // getBinAge
   [[nodiscard]] std::chrono::system_clock::time_point getCreationDate() const {
     return creation_dated_;
   }
   [[nodiscard]] int getSize() const {
     return gsl::narrow<int>(queue_.size());
   }
-
   [[nodiscard]] utils::SmallString<36> getUUIDStr() const {
     return uuid_.to_string();
   }
-
   [[nodiscard]] std::string getGroupId() const {
     return groupId_;
   }
@@ -124,20 +113,16 @@ class Bin {
   uint64_t maxSize_;
   size_t maxEntries_;
   size_t minEntries_;
-  // Queued data size
   uint64_t queued_data_size_;
   bool closed_{false};
-  // Queue for the Flow File
   std::deque<std::shared_ptr<core::FlowFile>> queue_;
   std::chrono::system_clock::time_point creation_dated_;
   std::string fileCount_;
   std::string groupId_;
   std::shared_ptr<core::logging::Logger> logger_ = 
core::logging::LoggerFactory<Bin>::getLogger();
-  // A global unique identifier
   utils::Identifier uuid_;
 };
 
-// BinManager Class
 class BinManager {
  public:
   virtual ~BinManager() {
@@ -175,8 +160,8 @@ class BinManager {
   void gatherReadyBins();
   // marks oldest bin as ready
   void removeOldestBin();
-  // get ready bin from binManager
   void getReadyBin(std::deque<std::unique_ptr<Bin>> &retBins);
+  void addReadyBin(std::unique_ptr<Bin> ready_bin);
 
  private:
   std::mutex mutex_;
@@ -261,7 +246,6 @@ class BinFiles : public core::Processor {
 
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 
-  // attributes
   EXTENSIONAPI static const char *FRAGMENT_ID_ATTRIBUTE;
   EXTENSIONAPI static const char *FRAGMENT_INDEX_ATTRIBUTE;
   EXTENSIONAPI static const char *FRAGMENT_COUNT_ATTRIBUTE;
@@ -284,19 +268,22 @@ class BinFiles : public core::Processor {
 
  protected:
   // Allows general pre-processing of a flow file before it is offered to a 
bin. This is called before getGroupId().
-  virtual void preprocessFlowFile(core::ProcessContext *context, 
core::ProcessSession *session, const std::shared_ptr<core::FlowFile>& flow);
+  virtual void preprocessFlowFile(const std::shared_ptr<core::FlowFile>& flow);
   // Returns a group ID representing a bin. This allows flow files to be 
binned into like groups
-  virtual std::string getGroupId(core::ProcessContext* /*context*/, const 
std::shared_ptr<core::FlowFile>& /*flow*/) {
+  virtual std::string getGroupId(const std::shared_ptr<core::FlowFile>& 
/*flow*/) {
     return "";
   }
-  // Processes a single bin.
-  virtual bool processBin(core::ProcessContext* /*context*/, 
core::ProcessSession* /*session*/, std::unique_ptr<Bin>& /*bin*/) {
+  virtual bool processBin(core::ProcessSession& /*session*/, 
std::unique_ptr<Bin>& /*bin*/) {
     return false;
   }
-  // transfer flows to failure in bin
-  static void transferFlowsToFail(core::ProcessContext *context, 
core::ProcessSession *session, std::unique_ptr<Bin> &bin);
-  // moves owned flows to session
-  static void addFlowsToSession(core::ProcessContext *context, 
core::ProcessSession *session, std::unique_ptr<Bin> &bin);
+  static void transferFlowsToFail(core::ProcessSession &session, 
std::unique_ptr<Bin> &bin);
+  static void addFlowsToSession(core::ProcessSession &session, 
std::unique_ptr<Bin> &bin);
+
+  // Sort flow files retrieved from the flow file repository after restart to 
their respective bins
+  bool resurrectFlowFiles(core::ProcessSession &session);
+  void assumeOwnershipOfNextBatch(core::ProcessSession &session);
+  std::deque<std::unique_ptr<Bin>> gatherReadyBins(core::ProcessContext 
&context);
+  void processReadyBins(std::deque<std::unique_ptr<Bin>> ready_bins, 
core::ProcessSession &session);
 
   BinManager binManager_;
 
diff --git a/extensions/libarchive/MergeContent.cpp 
b/extensions/libarchive/MergeContent.cpp
index 573b975fa..556b584ee 100644
--- a/extensions/libarchive/MergeContent.cpp
+++ b/extensions/libarchive/MergeContent.cpp
@@ -128,7 +128,7 @@ void MergeContent::validatePropertyOptions() {
   }
 }
 
-std::string MergeContent::getGroupId(core::ProcessContext*, const 
std::shared_ptr<core::FlowFile>& flow) {
+std::string MergeContent::getGroupId(const std::shared_ptr<core::FlowFile>& 
flow) {
   std::string groupId;
   std::string value;
   if (!correlationAttributeName_.empty()) {
@@ -191,7 +191,7 @@ void MergeContent::onTrigger(core::ProcessContext *context, 
core::ProcessSession
   BinFiles::onTrigger(context, session);
 }
 
-bool MergeContent::processBin(core::ProcessContext *context, 
core::ProcessSession *session, std::unique_ptr<Bin> &bin) {
+bool MergeContent::processBin(core::ProcessSession &session, 
std::unique_ptr<Bin> &bin) {
   if (mergeStrategy_ != merge_content_options::MERGE_STRATEGY_DEFRAGMENT && 
mergeStrategy_ != merge_content_options::MERGE_STRATEGY_BIN_PACK)
     return false;
 
@@ -213,7 +213,12 @@ bool MergeContent::processBin(core::ProcessContext 
*context, core::ProcessSessio
         });
   }
 
-  std::shared_ptr<core::FlowFile> merge_flow = 
std::static_pointer_cast<FlowFileRecord>(session->create());
+  std::shared_ptr<core::FlowFile> merge_flow = 
std::static_pointer_cast<FlowFileRecord>(session.create());
+  auto removeMergeFlow = gsl::finally([&](){
+    if (!session.hasBeenTransferred(*merge_flow)) {
+      session.remove(merge_flow);
+    }
+  });
   if (attributeStrategy_ == 
merge_content_options::ATTRIBUTE_STRATEGY_KEEP_COMMON) {
     
KeepOnlyCommonAttributesMerger(bin->getFlowFile()).mergeAttributes(session, 
merge_flow);
   } else if (attributeStrategy_ == 
merge_content_options::ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE) {
@@ -224,7 +229,7 @@ bool MergeContent::processBin(core::ProcessContext 
*context, core::ProcessSessio
   }
 
   auto flowFileReader = [&] (const std::shared_ptr<core::FlowFile>& ff, const 
io::InputStreamCallback& cb) {
-    return session->read(ff, cb);
+    return session.read(ff, cb);
   };
 
   const char* mimeType;
@@ -249,10 +254,9 @@ bool MergeContent::processBin(core::ProcessContext 
*context, core::ProcessSessio
     return false;
   }
 
-  std::shared_ptr<core::FlowFile> mergeFlow;
   try {
-    mergeBin->merge(context, session, bin->getFlowFile(), *serializer, 
merge_flow);
-    session->putAttribute(merge_flow, core::SpecialFlowAttribute::MIME_TYPE, 
mimeType);
+    mergeBin->merge(session, bin->getFlowFile(), *serializer, merge_flow);
+    session.putAttribute(merge_flow, core::SpecialFlowAttribute::MIME_TYPE, 
mimeType);
   } catch (const std::exception& ex) {
     logger_->log_error("Merge Content merge catch exception, type: %s, what: 
%s", typeid(ex).name(), ex.what());
     return false;
@@ -260,13 +264,13 @@ bool MergeContent::processBin(core::ProcessContext 
*context, core::ProcessSessio
     logger_->log_error("Merge Content merge catch exception, type: %s", 
getCurrentExceptionTypeName());
     return false;
   }
-  session->putAttribute(merge_flow, BinFiles::FRAGMENT_COUNT_ATTRIBUTE, 
std::to_string(bin->getSize()));
+  session.putAttribute(merge_flow, BinFiles::FRAGMENT_COUNT_ATTRIBUTE, 
std::to_string(bin->getSize()));
 
   // we successfully merge the flow
-  session->transfer(merge_flow, Merge);
+  session.transfer(merge_flow, Merge);
   std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile();
   for (const auto& flow : flows) {
-    session->transfer(flow, Original);
+    session.transfer(flow, Original);
   }
   logger_->log_info("Merge FlowFile record UUID %s, payload length %d", 
merge_flow->getUUIDStr(), merge_flow->getSize());
 
@@ -278,9 +282,9 @@ 
BinaryConcatenationMerge::BinaryConcatenationMerge(std::string header, std::stri
     footer_(std::move(footer)),
     demarcator_(std::move(demarcator)) {}
 
-void BinaryConcatenationMerge::merge(core::ProcessContext* /*context*/, 
core::ProcessSession *session,
+void BinaryConcatenationMerge::merge(core::ProcessSession &session,
     std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& 
serializer, const std::shared_ptr<core::FlowFile>& merge_flow) {
-  session->write(merge_flow, BinaryConcatenationMerge::WriteCallback{header_, 
footer_, demarcator_, flows, serializer});
+  session.write(merge_flow, BinaryConcatenationMerge::WriteCallback{header_, 
footer_, demarcator_, flows, serializer});
   std::string fileName;
   if (flows.size() == 1) {
     flows.front()->getAttribute(core::SpecialFlowAttribute::FILENAME, 
fileName);
@@ -288,12 +292,12 @@ void 
BinaryConcatenationMerge::merge(core::ProcessContext* /*context*/, core::Pr
     flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, fileName);
   }
   if (!fileName.empty())
-    session->putAttribute(merge_flow, core::SpecialFlowAttribute::FILENAME, 
fileName);
+    session.putAttribute(merge_flow, core::SpecialFlowAttribute::FILENAME, 
fileName);
 }
 
-void TarMerge::merge(core::ProcessContext* /*context*/, core::ProcessSession 
*session,
+void TarMerge::merge(core::ProcessSession &session,
     std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& 
serializer, const std::shared_ptr<core::FlowFile>& merge_flow) {
-  session->write(merge_flow, 
ArchiveMerge::WriteCallback{merge_content_options::MERGE_FORMAT_TAR_VALUE, 
flows, serializer});
+  session.write(merge_flow, 
ArchiveMerge::WriteCallback{merge_content_options::MERGE_FORMAT_TAR_VALUE, 
flows, serializer});
   std::string fileName;
   merge_flow->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName);
   if (flows.size() == 1) {
@@ -303,13 +307,13 @@ void TarMerge::merge(core::ProcessContext* /*context*/, 
core::ProcessSession *se
   }
   if (!fileName.empty()) {
     fileName += ".tar";
-    session->putAttribute(merge_flow, core::SpecialFlowAttribute::FILENAME, 
fileName);
+    session.putAttribute(merge_flow, core::SpecialFlowAttribute::FILENAME, 
fileName);
   }
 }
 
-void ZipMerge::merge(core::ProcessContext* /*context*/, core::ProcessSession 
*session,
+void ZipMerge::merge(core::ProcessSession &session,
     std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& 
serializer, const std::shared_ptr<core::FlowFile>& merge_flow) {
-  session->write(merge_flow, 
ArchiveMerge::WriteCallback{merge_content_options::MERGE_FORMAT_ZIP_VALUE, 
flows, serializer});
+  session.write(merge_flow, 
ArchiveMerge::WriteCallback{merge_content_options::MERGE_FORMAT_ZIP_VALUE, 
flows, serializer});
   std::string fileName;
   merge_flow->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName);
   if (flows.size() == 1) {
@@ -319,13 +323,13 @@ void ZipMerge::merge(core::ProcessContext* /*context*/, 
core::ProcessSession *se
   }
   if (!fileName.empty()) {
     fileName += ".zip";
-    session->putAttribute(merge_flow, core::SpecialFlowAttribute::FILENAME, 
fileName);
+    session.putAttribute(merge_flow, core::SpecialFlowAttribute::FILENAME, 
fileName);
   }
 }
 
-void AttributeMerger::mergeAttributes(core::ProcessSession *session, const 
std::shared_ptr<core::FlowFile> &merge_flow) {
+void AttributeMerger::mergeAttributes(core::ProcessSession &session, const 
std::shared_ptr<core::FlowFile> &merge_flow) {
   for (const auto& pair : getMergedAttributes()) {
-    session->putAttribute(merge_flow, pair.first, pair.second);
+    session.putAttribute(merge_flow, pair.first, pair.second);
   }
 }
 
diff --git a/extensions/libarchive/MergeContent.h 
b/extensions/libarchive/MergeContent.h
index 87c253b3d..e009df72f 100644
--- a/extensions/libarchive/MergeContent.h
+++ b/extensions/libarchive/MergeContent.h
@@ -56,7 +56,7 @@ class MergeBin {
  public:
   virtual ~MergeBin() = default;
   // merge the flows in the bin
-  virtual void merge(core::ProcessContext *context, core::ProcessSession 
*session,
+  virtual void merge(core::ProcessSession &session,
       std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& 
serializer, const std::shared_ptr<core::FlowFile> &flowFile) = 0;
 };
 
@@ -64,7 +64,7 @@ class BinaryConcatenationMerge : public MergeBin {
  public:
   BinaryConcatenationMerge(std::string header, std::string footer, std::string 
demarcator);
 
-  void merge(core::ProcessContext* context, core::ProcessSession *session,
+  void merge(core::ProcessSession &session,
     std::deque<std::shared_ptr<core::FlowFile>>& flows, FlowFileSerializer& 
serializer, const std::shared_ptr<core::FlowFile>& merge_flow) override;
   // Nest Callback Class for write stream
   class WriteCallback {
@@ -242,13 +242,13 @@ class ArchiveMerge {
 
 class TarMerge: public ArchiveMerge, public MergeBin {
  public:
-  void merge(core::ProcessContext *context, core::ProcessSession *session, 
std::deque<std::shared_ptr<core::FlowFile>> &flows,
+  void merge(core::ProcessSession &session, 
std::deque<std::shared_ptr<core::FlowFile>> &flows,
              FlowFileSerializer& serializer, const 
std::shared_ptr<core::FlowFile> &merge_flow) override;
 };
 
 class ZipMerge: public ArchiveMerge, public MergeBin {
  public:
-  void merge(core::ProcessContext *context, core::ProcessSession *session, 
std::deque<std::shared_ptr<core::FlowFile>> &flows,
+  void merge(core::ProcessSession &session, 
std::deque<std::shared_ptr<core::FlowFile>> &flows,
              FlowFileSerializer& serializer, const 
std::shared_ptr<core::FlowFile> &merge_flow) override;
 };
 
@@ -256,7 +256,7 @@ class AttributeMerger {
  public:
   explicit 
AttributeMerger(std::deque<std::shared_ptr<org::apache::nifi::minifi::core::FlowFile>>
 &flows)
     : flows_(flows) {}
-  void mergeAttributes(core::ProcessSession *session, const 
std::shared_ptr<core::FlowFile> &merge_flow);
+  void mergeAttributes(core::ProcessSession &session, const 
std::shared_ptr<core::FlowFile> &merge_flow);
   virtual ~AttributeMerger() = default;
 
  protected:
@@ -287,6 +287,17 @@ class KeepAllUniqueAttributesMerger: public 
AttributeMerger {
   std::vector<std::string> removed_attributes_;
 };
 
+/**
+ * A processor that merges multiple correlated flow files to a single flow file
+ *
+ * Concepts:
+ * - Batch size: represents the maximum number of flow files to be processed 
from the incoming relationship
+ * - Bin (or bundle): represents a set of flow files that belong together 
defined by the processor properties. Correlated flow files are defined by the 
CorrelationAttributeName property which
+ *                    defines the attribute that provides the groupid for the 
bin the flow file belongs to
+ * - Ready bin: when a bin reaches a limit defined by the maximum age or the 
maximum size, the bin becomes ready, and ready bins can be merged
+ * - Group: a set of bins with the same groupid. In case a bin cannot accept a 
new flow files (e.g. it would go above its size limit), a new bin is created 
with this new flow file and added
+ *          to the same group of bins
+ */
 class MergeContent : public processors::BinFiles {
  public:
   explicit MergeContent(const std::string& name, const utils::Identifier& uuid 
= {})
@@ -373,11 +384,11 @@ class MergeContent : public processors::BinFiles {
   void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory 
*sessionFactory) override;
   void onTrigger(core::ProcessContext *context, core::ProcessSession *session) 
override;
   void initialize() override;
-  bool processBin(core::ProcessContext *context, core::ProcessSession 
*session, std::unique_ptr<Bin> &bin) override;
+  bool processBin(core::ProcessSession &session, std::unique_ptr<Bin> &bin) 
override;
 
  protected:
   // Returns a group ID representing a bin. This allows flow files to be 
binned into like groups
-  std::string getGroupId(core::ProcessContext *context, const 
std::shared_ptr<core::FlowFile>& flow) override;
+  std::string getGroupId(const std::shared_ptr<core::FlowFile>& flow) override;
   // check whether the defragment bin is validate
   static bool checkDefragment(std::unique_ptr<Bin> &bin);
 
diff --git a/libminifi/include/core/ProcessSession.h 
b/libminifi/include/core/ProcessSession.h
index 36076e230..86fe5cf9e 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -151,6 +151,8 @@ class ProcessSession : public ReferenceContainer {
     metrics_ = metrics;
   }
 
+  bool hasBeenTransferred(const core::FlowFile &flow) const;
+
 // Prevent default copy constructor and assignment operation
 // Only support pass by reference or pointer
   ProcessSession(const ProcessSession &parent) = delete;
diff --git a/libminifi/src/core/ProcessSession.cpp 
b/libminifi/src/core/ProcessSession.cpp
index 56e5e0582..6660b634d 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -1147,4 +1147,9 @@ bool ProcessSession::existsFlowFileInRelationship(const 
Relationship &relationsh
   });
 }
 
+bool ProcessSession::hasBeenTransferred(const core::FlowFile &flow) const {
+  return (updated_relationships_.contains(flow.getUUID()) && 
updated_relationships_.at(flow.getUUID()) != nullptr) ||
+    (added_flowfiles_.contains(flow.getUUID()) && 
added_flowfiles_.at(flow.getUUID()).rel != nullptr);
+}
+
 }  // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/src/io/StreamSlice.cpp b/libminifi/src/io/StreamSlice.cpp
index 50fa91c37..3a9f9a457 100644
--- a/libminifi/src/io/StreamSlice.cpp
+++ b/libminifi/src/io/StreamSlice.cpp
@@ -23,7 +23,7 @@ namespace org::apache::nifi::minifi::io {
 StreamSlice::StreamSlice(std::shared_ptr<io::InputStream> stream, size_t 
offset, size_t size) : stream_(std::move(stream)), slice_offset_(offset), 
slice_size_(size) {
   stream_->seek(slice_offset_);
   if (stream_->size() < slice_offset_ + slice_size_)
-    throw std::invalid_argument("StreamSlice is bigger than the Stream");
+    throw std::invalid_argument(fmt::format("StreamSlice is bigger than the 
Stream, Stream size: {}, StreamSlice size: {}, offset: {}", stream_->size(), 
slice_size_, slice_offset_));
 }
 
 size_t StreamSlice::read(std::span<std::byte> out_buffer) {
diff --git a/libminifi/test/unit/StreamTests.cpp 
b/libminifi/test/unit/StreamTests.cpp
index a104ac74c..420ee3b0d 100644
--- a/libminifi/test/unit/StreamTests.cpp
+++ b/libminifi/test/unit/StreamTests.cpp
@@ -85,8 +85,8 @@ TEST_CASE("InvalidStreamSliceTest", "[teststreamslice]") {
   std::shared_ptr<minifi::io::BaseStream> base = 
std::make_shared<minifi::io::BufferStream>();
   base->write((const uint8_t*)"\x01\x02\x03\x04\x05\x06\x07\x08", 8);
   auto input_stream = std::static_pointer_cast<minifi::io::InputStream>(base);
-  REQUIRE_THROWS_WITH(std::make_shared<minifi::io::StreamSlice>(input_stream, 
0, 9), "StreamSlice is bigger than the Stream");
-  REQUIRE_THROWS_WITH(std::make_shared<minifi::io::StreamSlice>(input_stream, 
7, 3), "StreamSlice is bigger than the Stream");
+  REQUIRE_THROWS_WITH(std::make_shared<minifi::io::StreamSlice>(input_stream, 
0, 9), "StreamSlice is bigger than the Stream, Stream size: 8, StreamSlice 
size: 9, offset: 0");
+  REQUIRE_THROWS_WITH(std::make_shared<minifi::io::StreamSlice>(input_stream, 
7, 3), "StreamSlice is bigger than the Stream, Stream size: 8, StreamSlice 
size: 3, offset: 7");
 }
 
 TEST_CASE("StreamSliceTest1", "[teststreamslice]") {

Reply via email to