This is an automated email from the ASF dual-hosted git repository. fgerlits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit d15cc2546ebadb7c929fc58949e41964345ef58f Author: Gabor Gyimesi <[email protected]> AuthorDate: Mon Jul 24 11:49:50 2023 +0200 MINIFICPP-2173 Fix MergeContent error handling issues - Remove intermediate individual `ProcessSession` for every processed bin - Rollback flow files of a merge if the merge fails with an exception and put them back to the ready bins - Remove `merge_flow` from session in case the merge fails to avoid throwing an exception for not finding the transfer relationship - Do not yield and continue processing of the bins in case putting one of the flow files in a bin fails - Log stream size and offset values in case of a `StreamSlice` creation failure - Remove unused function parameters and use references instead of raw pointers Signed-off-by: Ferenc Gerlits <[email protected]> This closes #1616 --- extensions/libarchive/BinFiles.cpp | 127 +++++++++++++++++--------------- extensions/libarchive/BinFiles.h | 37 +++------- extensions/libarchive/MergeContent.cpp | 46 ++++++------ extensions/libarchive/MergeContent.h | 25 +++++-- libminifi/include/core/ProcessSession.h | 2 + libminifi/src/core/ProcessSession.cpp | 5 ++ libminifi/src/io/StreamSlice.cpp | 2 +- libminifi/test/unit/StreamTests.cpp | 4 +- 8 files changed, 133 insertions(+), 115 deletions(-) diff --git a/extensions/libarchive/BinFiles.cpp b/extensions/libarchive/BinFiles.cpp index 498ee4b3d..1b0f61e1d 100644 --- a/extensions/libarchive/BinFiles.cpp +++ b/extensions/libarchive/BinFiles.cpp @@ -80,7 +80,7 @@ void BinFiles::onSchedule(core::ProcessContext *context, core::ProcessSessionFac } } -void BinFiles::preprocessFlowFile(core::ProcessContext* /*context*/, core::ProcessSession* /*session*/, const std::shared_ptr<core::FlowFile>& flow) { +void BinFiles::preprocessFlowFile(const std::shared_ptr<core::FlowFile>& flow) { // handle backward compatibility with old segment attributes std::string value; if (!flow->getAttribute(BinFiles::FRAGMENT_COUNT_ATTRIBUTE, value) && flow->getAttribute(BinFiles::SEGMENT_COUNT_ATTRIBUTE, value)) { @@ -156,6 +156,11 @@ void BinManager::getReadyBin(std::deque<std::unique_ptr<Bin>> &retBins) { } } +void BinManager::addReadyBin(std::unique_ptr<Bin> ready_bin) { + std::lock_guard<std::mutex> lock(mutex_); + readyBin_.push_back(std::move(ready_bin)); +} + bool BinManager::offer(const std::string &group, const std::shared_ptr<core::FlowFile>& flow) { std::lock_guard < std::mutex > lock(mutex_); if (flow->getSize() > maxSize_) { @@ -203,93 +208,97 @@ bool BinManager::offer(const std::string &group, const std::shared_ptr<core::Flo return true; } -void BinFiles::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) { - // Rollback is not viable for this processor!! - { - // process resurrected FlowFiles first - auto flowFiles = file_store_.getNewFlowFiles(); - // these are already processed FlowFiles, that we own - bool hadFailure = false; - for (auto &file : flowFiles) { - std::string groupId = getGroupId(context.get(), file); - bool offer = this->binManager_.offer(groupId, file); - if (!offer) { - session->transfer(file, Failure); - hadFailure = true; - } else { - // no need to route successfully captured such files as we already own them - } - } - if (hadFailure) { - context->yield(); - return; +bool BinFiles::resurrectFlowFiles(core::ProcessSession &session) { + auto flow_files = file_store_.getNewFlowFiles(); + // these are already processed FlowFiles, that we own + bool had_failure = false; + for (auto &file : flow_files) { + std::string group_id = getGroupId(file); + if (!binManager_.offer(group_id, file)) { + session.transfer(file, Failure); + had_failure = true; } + // no need to route successfully captured such files as we already own them in the Self relationship } + return had_failure; +} +void BinFiles::assumeOwnershipOfNextBatch(core::ProcessSession &session) { for (size_t i = 0; i < batchSize_; ++i) { - auto flow = session->get(); + auto flow = session.get(); if (flow == nullptr) { break; } - preprocessFlowFile(context.get(), session.get(), flow); - std::string groupId = getGroupId(context.get(), flow); + preprocessFlowFile(flow); + std::string group_id = getGroupId(flow); - bool offer = this->binManager_.offer(groupId, flow); + bool offer = binManager_.offer(group_id, flow); if (!offer) { - session->transfer(flow, Failure); - context->yield(); - return; + session.transfer(flow, Failure); + continue; + } + session.transfer(flow, Self); + } + session.commit(); +} + +void BinFiles::processReadyBins(std::deque<std::unique_ptr<Bin>> ready_bins, core::ProcessSession &session) { + while (!ready_bins.empty()) { + std::unique_ptr<Bin> bin = std::move(ready_bins.front()); + ready_bins.pop_front(); + + try { + addFlowsToSession(session, bin); + logger_->log_debug("BinFiles start to process bin %s for group %s", bin->getUUIDStr(), bin->getGroupId()); + if (!processBin(session, bin)) + transferFlowsToFail(session, bin); + session.commit(); + } catch(const std::exception& ex) { + logger_->log_error("Caught Exception type: '%s' while merging ready bin: '%s'", typeid(ex).name(), ex.what()); + binManager_.addReadyBin(std::move(bin)); + session.rollback(); } - // assuming ownership over the incoming flowFile - session->transfer(flow, Self); } +} - // migrate bin to ready bin - this->binManager_.gatherReadyBins(); - if (gsl::narrow<uint32_t>(this->binManager_.getBinCount()) > maxBinCount_) { +std::deque<std::unique_ptr<Bin>> BinFiles::gatherReadyBins(core::ProcessContext &context) { + binManager_.gatherReadyBins(); + if (gsl::narrow<uint32_t>(binManager_.getBinCount()) > maxBinCount_) { // bin count reach max allowed - context->yield(); - logger_->log_debug("BinFiles reach max bin count %d", this->binManager_.getBinCount()); - this->binManager_.removeOldestBin(); + context.yield(); + logger_->log_debug("BinFiles reach max bin count %d", binManager_.getBinCount()); + binManager_.removeOldestBin(); } - // get the ready bin - std::deque<std::unique_ptr<Bin>> readyBins; - binManager_.getReadyBin(readyBins); + std::deque<std::unique_ptr<Bin>> ready_bins; + binManager_.getReadyBin(ready_bins); + return ready_bins; +} - // process the ready bin - while (!readyBins.empty()) { - // create session for merge - // we have to create a new session - // for each merge as a rollback erases all - // previously added files - core::ProcessSession mergeSession(context); - mergeSession.setMetrics(metrics_); - std::unique_ptr<Bin> bin = std::move(readyBins.front()); - readyBins.pop_front(); - // add bin's flows to the session - this->addFlowsToSession(context.get(), &mergeSession, bin); - logger_->log_debug("BinFiles start to process bin %s for group %s", bin->getUUIDStr(), bin->getGroupId()); - if (!this->processBin(context.get(), &mergeSession, bin)) - this->transferFlowsToFail(context.get(), &mergeSession, bin); - mergeSession.commit(); +void BinFiles::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) { + if (resurrectFlowFiles(*session)) { + context->yield(); + return; } + + assumeOwnershipOfNextBatch(*session); + processReadyBins(gatherReadyBins(*context), *session); } -void BinFiles::transferFlowsToFail(core::ProcessContext* /*context*/, core::ProcessSession *session, std::unique_ptr<Bin> &bin) { +void BinFiles::transferFlowsToFail(core::ProcessSession &session, std::unique_ptr<Bin> &bin) { std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile(); for (const auto& flow : flows) { - session->transfer(flow, Failure); + session.transfer(flow, Failure); } flows.clear(); } -void BinFiles::addFlowsToSession(core::ProcessContext* /*context*/, core::ProcessSession *session, std::unique_ptr<Bin> &bin) { +void BinFiles::addFlowsToSession(core::ProcessSession &session, std::unique_ptr<Bin> &bin) { std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile(); for (const auto& flow : flows) { - session->add(flow); + session.add(flow); } } diff --git a/extensions/libarchive/BinFiles.h b/extensions/libarchive/BinFiles.h index d32025268..db549f680 100644 --- a/extensions/libarchive/BinFiles.h +++ b/extensions/libarchive/BinFiles.h @@ -40,13 +40,8 @@ namespace org::apache::nifi::minifi::processors { -// Bin Class class Bin { public: - // Constructor - /*! - * Create a new Bin. Note: this object is not thread safe - */ explicit Bin(const uint64_t &minSize, const uint64_t &maxSize, const size_t &minEntries, const size_t & maxEntries, std::string fileCount, std::string groupId) : minSize_(minSize), maxSize_(maxSize), @@ -62,7 +57,6 @@ class Bin { virtual ~Bin() { logger_->log_debug("Bin %s for group %s destroyed", getUUIDStr(), groupId_); } - // check whether the bin is full [[nodiscard]] bool isFull() const { return queued_data_size_ >= maxSize_ || queue_.size() >= maxEntries_; } @@ -70,14 +64,12 @@ class Bin { [[nodiscard]] bool isReadyForMerge() const { return closed_ || isFull() || (queued_data_size_ >= minSize_ && queue_.size() >= minEntries_); } - // check whether the bin is older than the time specified in msec [[nodiscard]] bool isOlderThan(const std::chrono::milliseconds duration) const { return std::chrono::system_clock::now() > (creation_dated_ + duration); } std::deque<std::shared_ptr<core::FlowFile>>& getFlowFile() { return queue_; } - // offer the flowfile to the bin bool offer(const std::shared_ptr<core::FlowFile>& flow) { if (!fileCount_.empty()) { std::string value; @@ -103,18 +95,15 @@ class Bin { return true; } - // getBinAge [[nodiscard]] std::chrono::system_clock::time_point getCreationDate() const { return creation_dated_; } [[nodiscard]] int getSize() const { return gsl::narrow<int>(queue_.size()); } - [[nodiscard]] utils::SmallString<36> getUUIDStr() const { return uuid_.to_string(); } - [[nodiscard]] std::string getGroupId() const { return groupId_; } @@ -124,20 +113,16 @@ class Bin { uint64_t maxSize_; size_t maxEntries_; size_t minEntries_; - // Queued data size uint64_t queued_data_size_; bool closed_{false}; - // Queue for the Flow File std::deque<std::shared_ptr<core::FlowFile>> queue_; std::chrono::system_clock::time_point creation_dated_; std::string fileCount_; std::string groupId_; std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<Bin>::getLogger(); - // A global unique identifier utils::Identifier uuid_; }; -// BinManager Class class BinManager { public: virtual ~BinManager() { @@ -175,8 +160,8 @@ class BinManager { void gatherReadyBins(); // marks oldest bin as ready void removeOldestBin(); - // get ready bin from binManager void getReadyBin(std::deque<std::unique_ptr<Bin>> &retBins); + void addReadyBin(std::unique_ptr<Bin> ready_bin); private: std::mutex mutex_; @@ -261,7 +246,6 @@ class BinFiles : public core::Processor { ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS - // attributes EXTENSIONAPI static const char *FRAGMENT_ID_ATTRIBUTE; EXTENSIONAPI static const char *FRAGMENT_INDEX_ATTRIBUTE; EXTENSIONAPI static const char *FRAGMENT_COUNT_ATTRIBUTE; @@ -284,19 +268,22 @@ class BinFiles : public core::Processor { protected: // Allows general pre-processing of a flow file before it is offered to a bin. This is called before getGroupId(). - virtual void preprocessFlowFile(core::ProcessContext *context, core::ProcessSession *session, const std::shared_ptr<core::FlowFile>& flow); + virtual void preprocessFlowFile(const std::shared_ptr<core::FlowFile>& flow); // Returns a group ID representing a bin. This allows flow files to be binned into like groups - virtual std::string getGroupId(core::ProcessContext* /*context*/, const std::shared_ptr<core::FlowFile>& /*flow*/) { + virtual std::string getGroupId(const std::shared_ptr<core::FlowFile>& /*flow*/) { return ""; } - // Processes a single bin. - virtual bool processBin(core::ProcessContext* /*context*/, core::ProcessSession* /*session*/, std::unique_ptr<Bin>& /*bin*/) { + virtual bool processBin(core::ProcessSession& /*session*/, std::unique_ptr<Bin>& /*bin*/) { return false; } - // transfer flows to failure in bin - static void transferFlowsToFail(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr<Bin> &bin); - // moves owned flows to session - static void addFlowsToSession(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr<Bin> &bin); + static void transferFlowsToFail(core::ProcessSession &session, std::unique_ptr<Bin> &bin); + static void addFlowsToSession(core::ProcessSession &session, std::unique_ptr<Bin> &bin); + + // Sort flow files retrieved from the flow file repository after restart to their respective bins + bool resurrectFlowFiles(core::ProcessSession &session); + void assumeOwnershipOfNextBatch(core::ProcessSession &session); + std::deque<std::unique_ptr<Bin>> gatherReadyBins(core::ProcessContext &context); + void processReadyBins(std::deque<std::unique_ptr<Bin>> ready_bins, core::ProcessSession &session); BinManager binManager_; diff --git a/extensions/libarchive/MergeContent.cpp b/extensions/libarchive/MergeContent.cpp index 573b975fa..556b584ee 100644 --- a/extensions/libarchive/MergeContent.cpp +++ b/extensions/libarchive/MergeContent.cpp @@ -128,7 +128,7 @@ void MergeContent::validatePropertyOptions() { } } -std::string MergeContent::getGroupId(core::ProcessContext*, const std::shared_ptr<core::FlowFile>& flow) { +std::string MergeContent::getGroupId(const std::shared_ptr<core::FlowFile>& flow) { std::string groupId; std::string value; if (!correlationAttributeName_.empty()) { @@ -191,7 +191,7 @@ void MergeContent::onTrigger(core::ProcessContext *context, core::ProcessSession BinFiles::onTrigger(context, session); } -bool MergeContent::processBin(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr<Bin> &bin) { +bool MergeContent::processBin(core::ProcessSession &session, std::unique_ptr<Bin> &bin) { if (mergeStrategy_ != merge_content_options::MERGE_STRATEGY_DEFRAGMENT && mergeStrategy_ != merge_content_options::MERGE_STRATEGY_BIN_PACK) return false; @@ -213,7 +213,12 @@ bool MergeContent::processBin(core::ProcessContext *context, core::ProcessSessio }); } - std::shared_ptr<core::FlowFile> merge_flow = std::static_pointer_cast<FlowFileRecord>(session->create()); + std::shared_ptr<core::FlowFile> merge_flow = std::static_pointer_cast<FlowFileRecord>(session.create()); + auto removeMergeFlow = gsl::finally([&](){ + if (!session.hasBeenTransferred(*merge_flow)) { + session.remove(merge_flow); + } + }); if (attributeStrategy_ == merge_content_options::ATTRIBUTE_STRATEGY_KEEP_COMMON) { KeepOnlyCommonAttributesMerger(bin->getFlowFile()).mergeAttributes(session, merge_flow); } else if (attributeStrategy_ == merge_content_options::ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE) { @@ -224,7 +229,7 @@ bool MergeContent::processBin(core::ProcessContext *context, core::ProcessSessio } auto flowFileReader = [&] (const std::shared_ptr<core::FlowFile>& ff, const io::InputStreamCallback& cb) { - return session->read(ff, cb); + return session.read(ff, cb); }; const char* mimeType; @@ -249,10 +254,9 @@ bool MergeContent::processBin(core::ProcessContext *context, core::ProcessSessio return false; } - std::shared_ptr<core::FlowFile> mergeFlow; try { - mergeBin->merge(context, session, bin->getFlowFile(), *serializer, merge_flow); - session->putAttribute(merge_flow, core::SpecialFlowAttribute::MIME_TYPE, mimeType); + mergeBin->merge(session, bin->getFlowFile(), *serializer, merge_flow); + session.putAttribute(merge_flow, core::SpecialFlowAttribute::MIME_TYPE, mimeType); } catch (const std::exception& ex) { logger_->log_error("Merge Content merge catch exception, type: %s, what: %s", typeid(ex).name(), ex.what()); return false; @@ -260,13 +264,13 @@ bool MergeContent::processBin(core::ProcessContext *context, core::ProcessSessio logger_->log_error("Merge Content merge catch exception, type: %s", getCurrentExceptionTypeName()); return false; } - session->putAttribute(merge_flow, BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(bin->getSize())); + session.putAttribute(merge_flow, BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(bin->getSize())); // we successfully merge the flow - session->transfer(merge_flow, Merge); + session.transfer(merge_flow, Merge); std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile(); for (const auto& flow : flows) { - session->transfer(flow, Original); + session.transfer(flow, Original); } logger_->log_info("Merge FlowFile record UUID %s, payload length %d", merge_flow->getUUIDStr(), merge_flow->getSize()); @@ -278,9 +282,9 @@ BinaryConcatenationMerge::BinaryConcatenationMerge(std::string header, std::stri footer_(std::move(footer)), demarcator_(std::move(demarcator)) {} -void BinaryConcatenationMerge::merge(core::ProcessContext* /*context*/, core::ProcessSession *session, +void BinaryConcatenationMerge::merge(core::ProcessSession &session, std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile>& merge_flow) { - session->write(merge_flow, BinaryConcatenationMerge::WriteCallback{header_, footer_, demarcator_, flows, serializer}); + session.write(merge_flow, BinaryConcatenationMerge::WriteCallback{header_, footer_, demarcator_, flows, serializer}); std::string fileName; if (flows.size() == 1) { flows.front()->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName); @@ -288,12 +292,12 @@ void BinaryConcatenationMerge::merge(core::ProcessContext* /*context*/, core::Pr flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, fileName); } if (!fileName.empty()) - session->putAttribute(merge_flow, core::SpecialFlowAttribute::FILENAME, fileName); + session.putAttribute(merge_flow, core::SpecialFlowAttribute::FILENAME, fileName); } -void TarMerge::merge(core::ProcessContext* /*context*/, core::ProcessSession *session, +void TarMerge::merge(core::ProcessSession &session, std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile>& merge_flow) { - session->write(merge_flow, ArchiveMerge::WriteCallback{merge_content_options::MERGE_FORMAT_TAR_VALUE, flows, serializer}); + session.write(merge_flow, ArchiveMerge::WriteCallback{merge_content_options::MERGE_FORMAT_TAR_VALUE, flows, serializer}); std::string fileName; merge_flow->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName); if (flows.size() == 1) { @@ -303,13 +307,13 @@ void TarMerge::merge(core::ProcessContext* /*context*/, core::ProcessSession *se } if (!fileName.empty()) { fileName += ".tar"; - session->putAttribute(merge_flow, core::SpecialFlowAttribute::FILENAME, fileName); + session.putAttribute(merge_flow, core::SpecialFlowAttribute::FILENAME, fileName); } } -void ZipMerge::merge(core::ProcessContext* /*context*/, core::ProcessSession *session, +void ZipMerge::merge(core::ProcessSession &session, std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile>& merge_flow) { - session->write(merge_flow, ArchiveMerge::WriteCallback{merge_content_options::MERGE_FORMAT_ZIP_VALUE, flows, serializer}); + session.write(merge_flow, ArchiveMerge::WriteCallback{merge_content_options::MERGE_FORMAT_ZIP_VALUE, flows, serializer}); std::string fileName; merge_flow->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName); if (flows.size() == 1) { @@ -319,13 +323,13 @@ void ZipMerge::merge(core::ProcessContext* /*context*/, core::ProcessSession *se } if (!fileName.empty()) { fileName += ".zip"; - session->putAttribute(merge_flow, core::SpecialFlowAttribute::FILENAME, fileName); + session.putAttribute(merge_flow, core::SpecialFlowAttribute::FILENAME, fileName); } } -void AttributeMerger::mergeAttributes(core::ProcessSession *session, const std::shared_ptr<core::FlowFile> &merge_flow) { +void AttributeMerger::mergeAttributes(core::ProcessSession &session, const std::shared_ptr<core::FlowFile> &merge_flow) { for (const auto& pair : getMergedAttributes()) { - session->putAttribute(merge_flow, pair.first, pair.second); + session.putAttribute(merge_flow, pair.first, pair.second); } } diff --git a/extensions/libarchive/MergeContent.h b/extensions/libarchive/MergeContent.h index 87c253b3d..e009df72f 100644 --- a/extensions/libarchive/MergeContent.h +++ b/extensions/libarchive/MergeContent.h @@ -56,7 +56,7 @@ class MergeBin { public: virtual ~MergeBin() = default; // merge the flows in the bin - virtual void merge(core::ProcessContext *context, core::ProcessSession *session, + virtual void merge(core::ProcessSession &session, std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile> &flowFile) = 0; }; @@ -64,7 +64,7 @@ class BinaryConcatenationMerge : public MergeBin { public: BinaryConcatenationMerge(std::string header, std::string footer, std::string demarcator); - void merge(core::ProcessContext* context, core::ProcessSession *session, + void merge(core::ProcessSession &session, std::deque<std::shared_ptr<core::FlowFile>>& flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile>& merge_flow) override; // Nest Callback Class for write stream class WriteCallback { @@ -242,13 +242,13 @@ class ArchiveMerge { class TarMerge: public ArchiveMerge, public MergeBin { public: - void merge(core::ProcessContext *context, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> &flows, + void merge(core::ProcessSession &session, std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile> &merge_flow) override; }; class ZipMerge: public ArchiveMerge, public MergeBin { public: - void merge(core::ProcessContext *context, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> &flows, + void merge(core::ProcessSession &session, std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile> &merge_flow) override; }; @@ -256,7 +256,7 @@ class AttributeMerger { public: explicit AttributeMerger(std::deque<std::shared_ptr<org::apache::nifi::minifi::core::FlowFile>> &flows) : flows_(flows) {} - void mergeAttributes(core::ProcessSession *session, const std::shared_ptr<core::FlowFile> &merge_flow); + void mergeAttributes(core::ProcessSession &session, const std::shared_ptr<core::FlowFile> &merge_flow); virtual ~AttributeMerger() = default; protected: @@ -287,6 +287,17 @@ class KeepAllUniqueAttributesMerger: public AttributeMerger { std::vector<std::string> removed_attributes_; }; +/** + * A processor that merges multiple correlated flow files to a single flow file + * + * Concepts: + * - Batch size: represents the maximum number of flow files to be processed from the incoming relationship + * - Bin (or bundle): represents a set of flow files that belong together defined by the processor properties. Correlated flow files are defined by the CorrelationAttributeName property which + * defines the attribute that provides the groupid for the bin the flow file belongs to + * - Ready bin: when a bin reaches a limit defined by the maximum age or the maximum size, the bin becomes ready, and ready bins can be merged + * - Group: a set of bins with the same groupid. In case a bin cannot accept a new flow files (e.g. it would go above its size limit), a new bin is created with this new flow file and added + * to the same group of bins + */ class MergeContent : public processors::BinFiles { public: explicit MergeContent(const std::string& name, const utils::Identifier& uuid = {}) @@ -373,11 +384,11 @@ class MergeContent : public processors::BinFiles { void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override; void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override; void initialize() override; - bool processBin(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr<Bin> &bin) override; + bool processBin(core::ProcessSession &session, std::unique_ptr<Bin> &bin) override; protected: // Returns a group ID representing a bin. This allows flow files to be binned into like groups - std::string getGroupId(core::ProcessContext *context, const std::shared_ptr<core::FlowFile>& flow) override; + std::string getGroupId(const std::shared_ptr<core::FlowFile>& flow) override; // check whether the defragment bin is validate static bool checkDefragment(std::unique_ptr<Bin> &bin); diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h index 36076e230..86fe5cf9e 100644 --- a/libminifi/include/core/ProcessSession.h +++ b/libminifi/include/core/ProcessSession.h @@ -151,6 +151,8 @@ class ProcessSession : public ReferenceContainer { metrics_ = metrics; } + bool hasBeenTransferred(const core::FlowFile &flow) const; + // Prevent default copy constructor and assignment operation // Only support pass by reference or pointer ProcessSession(const ProcessSession &parent) = delete; diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index 56e5e0582..6660b634d 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -1147,4 +1147,9 @@ bool ProcessSession::existsFlowFileInRelationship(const Relationship &relationsh }); } +bool ProcessSession::hasBeenTransferred(const core::FlowFile &flow) const { + return (updated_relationships_.contains(flow.getUUID()) && updated_relationships_.at(flow.getUUID()) != nullptr) || + (added_flowfiles_.contains(flow.getUUID()) && added_flowfiles_.at(flow.getUUID()).rel != nullptr); +} + } // namespace org::apache::nifi::minifi::core diff --git a/libminifi/src/io/StreamSlice.cpp b/libminifi/src/io/StreamSlice.cpp index 50fa91c37..3a9f9a457 100644 --- a/libminifi/src/io/StreamSlice.cpp +++ b/libminifi/src/io/StreamSlice.cpp @@ -23,7 +23,7 @@ namespace org::apache::nifi::minifi::io { StreamSlice::StreamSlice(std::shared_ptr<io::InputStream> stream, size_t offset, size_t size) : stream_(std::move(stream)), slice_offset_(offset), slice_size_(size) { stream_->seek(slice_offset_); if (stream_->size() < slice_offset_ + slice_size_) - throw std::invalid_argument("StreamSlice is bigger than the Stream"); + throw std::invalid_argument(fmt::format("StreamSlice is bigger than the Stream, Stream size: {}, StreamSlice size: {}, offset: {}", stream_->size(), slice_size_, slice_offset_)); } size_t StreamSlice::read(std::span<std::byte> out_buffer) { diff --git a/libminifi/test/unit/StreamTests.cpp b/libminifi/test/unit/StreamTests.cpp index a104ac74c..420ee3b0d 100644 --- a/libminifi/test/unit/StreamTests.cpp +++ b/libminifi/test/unit/StreamTests.cpp @@ -85,8 +85,8 @@ TEST_CASE("InvalidStreamSliceTest", "[teststreamslice]") { std::shared_ptr<minifi::io::BaseStream> base = std::make_shared<minifi::io::BufferStream>(); base->write((const uint8_t*)"\x01\x02\x03\x04\x05\x06\x07\x08", 8); auto input_stream = std::static_pointer_cast<minifi::io::InputStream>(base); - REQUIRE_THROWS_WITH(std::make_shared<minifi::io::StreamSlice>(input_stream, 0, 9), "StreamSlice is bigger than the Stream"); - REQUIRE_THROWS_WITH(std::make_shared<minifi::io::StreamSlice>(input_stream, 7, 3), "StreamSlice is bigger than the Stream"); + REQUIRE_THROWS_WITH(std::make_shared<minifi::io::StreamSlice>(input_stream, 0, 9), "StreamSlice is bigger than the Stream, Stream size: 8, StreamSlice size: 9, offset: 0"); + REQUIRE_THROWS_WITH(std::make_shared<minifi::io::StreamSlice>(input_stream, 7, 3), "StreamSlice is bigger than the Stream, Stream size: 8, StreamSlice size: 3, offset: 7"); } TEST_CASE("StreamSliceTest1", "[teststreamslice]") {
