This is an automated email from the ASF dual-hosted git repository. aboda pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 365523f59fcb3a6daf9b226bd6d5f79192fd7b2f Author: Gabor Gyimesi <[email protected]> AuthorDate: Mon Aug 31 16:47:38 2020 +0200 MINIFICPP-1321 Do not override MergeContent output attributes This commit handles the case when flowfiles processed by MergeContent could have common attributes, with the same value (like mime type) which after the attribute merge would override the individually set output attribute. Signed-off-by: Arpad Boda <[email protected]> This closes #888 --- extensions/libarchive/MergeContent.cpp | 107 +++++++++++++++--------- extensions/libarchive/MergeContent.h | 29 ++++--- libminifi/test/archive-tests/MergeFileTests.cpp | 8 +- 3 files changed, 88 insertions(+), 56 deletions(-) diff --git a/extensions/libarchive/MergeContent.cpp b/extensions/libarchive/MergeContent.cpp index 25a32d1..c0761e5 100644 --- a/extensions/libarchive/MergeContent.cpp +++ b/extensions/libarchive/MergeContent.cpp @@ -147,6 +147,9 @@ void MergeContent::onSchedule(core::ProcessContext *context, core::ProcessSessio if (context->getProperty(AttributeStrategy.getName(), value) && !value.empty()) { attributeStrategy_ = value; } + + validatePropertyOptions(); + if (mergeStrategy_ == merge_content_options::MERGE_STRATEGY_DEFRAGMENT) { binManager_.setFileCount(FRAGMENT_COUNT_ATTRIBUTE); } @@ -170,7 +173,34 @@ void MergeContent::onSchedule(core::ProcessContext *context, core::ProcessSessio } } -std::string MergeContent::getGroupId(core::ProcessContext *context, std::shared_ptr<core::FlowFile> flow) { +void MergeContent::validatePropertyOptions() { + if (mergeStrategy_ != merge_content_options::MERGE_STRATEGY_DEFRAGMENT && + mergeStrategy_ != merge_content_options::MERGE_STRATEGY_BIN_PACK) { + logger_->log_error("Merge strategy not supported %s", mergeStrategy_); + throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Invalid merge strategy: " + attributeStrategy_); + } + + if (mergeFormat_ != merge_content_options::MERGE_FORMAT_CONCAT_VALUE && + mergeFormat_ != merge_content_options::MERGE_FORMAT_TAR_VALUE && + mergeFormat_ != merge_content_options::MERGE_FORMAT_ZIP_VALUE) { + logger_->log_error("Merge format not supported %s", mergeFormat_); + throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Invalid merge format: " + mergeFormat_); + } + + if (delimiterStrategy_ != merge_content_options::DELIMITER_STRATEGY_FILENAME && + delimiterStrategy_ != merge_content_options::DELIMITER_STRATEGY_TEXT) { + logger_->log_error("Delimiter strategy not supported %s", delimiterStrategy_); + throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Invalid delimiter strategy: " + delimiterStrategy_); + } + + if (attributeStrategy_ != merge_content_options::ATTRIBUTE_STRATEGY_KEEP_COMMON && + attributeStrategy_ != merge_content_options::ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE) { + logger_->log_error("Attribute strategy not supported %s", attributeStrategy_); + throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Invalid attribute strategy: " + attributeStrategy_); + } +} + +std::string MergeContent::getGroupId(core::ProcessContext*, std::shared_ptr<core::FlowFile> flow) { std::string groupId = ""; std::string value; if (!correlationAttributeName_.empty()) { @@ -258,6 +288,16 @@ bool MergeContent::processBin(core::ProcessContext *context, core::ProcessSessio }); } + std::shared_ptr<core::FlowFile> merge_flow = std::static_pointer_cast<FlowFileRecord>(session->create()); + if (attributeStrategy_ == merge_content_options::ATTRIBUTE_STRATEGY_KEEP_COMMON) + KeepOnlyCommonAttributesMerger(bin->getFlowFile()).mergeAttributes(session, merge_flow); + else if (attributeStrategy_ == merge_content_options::ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE) + KeepAllUniqueAttributesMerger(bin->getFlowFile()).mergeAttributes(session, merge_flow); + else { + logger_->log_error("Attribute strategy not supported %s", attributeStrategy_); + return false; + } + std::unique_ptr<MergeBin> mergeBin; if (mergeFormat_ == merge_content_options::MERGE_FORMAT_CONCAT_VALUE) mergeBin = utils::make_unique<BinaryConcatenationMerge>(); @@ -270,41 +310,31 @@ bool MergeContent::processBin(core::ProcessContext *context, core::ProcessSessio return false; } - std::shared_ptr<core::FlowFile> mergeFlow; try { - mergeFlow = mergeBin->merge(context, session, bin->getFlowFile(), headerContent_, footerContent_, demarcatorContent_); + mergeBin->merge(context, session, bin->getFlowFile(), headerContent_, footerContent_, demarcatorContent_, merge_flow); } catch (...) { logger_->log_error("Merge Content merge catch exception"); return false; } - session->putAttribute(mergeFlow, BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(bin->getSize())); - - if (attributeStrategy_ == merge_content_options::ATTRIBUTE_STRATEGY_KEEP_COMMON) - KeepOnlyCommonAttributesMerger(bin->getFlowFile()).mergeAttributes(session, mergeFlow); - else if (attributeStrategy_ == merge_content_options::ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE) - KeepAllUniqueAttributesMerger(bin->getFlowFile()).mergeAttributes(session, mergeFlow); - else { - logger_->log_error("Attribute strategy not supported %s", attributeStrategy_); - throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Invalid attribute strategy: " + attributeStrategy_); - } + session->putAttribute(merge_flow, BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(bin->getSize())); // we successfully merge the flow - session->transfer(mergeFlow, Merge); + session->transfer(merge_flow, Merge); std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile(); for (auto flow : flows) { session->transfer(flow, Original); } - logger_->log_info("Merge FlowFile record UUID %s, payload length %d", mergeFlow->getUUIDStr(), mergeFlow->getSize()); + logger_->log_info("Merge FlowFile record UUID %s, payload length %d", merge_flow->getUUIDStr(), merge_flow->getSize()); return true; } -std::shared_ptr<core::FlowFile> BinaryConcatenationMerge::merge(core::ProcessContext *context, core::ProcessSession *session, - std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header, std::string &footer, std::string &demarcator) { - std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); +void BinaryConcatenationMerge::merge(core::ProcessContext*, core::ProcessSession *session, + std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header, std::string &footer, std::string &demarcator, + const std::shared_ptr<core::FlowFile> &merge_flow) { BinaryConcatenationMerge::WriteCallback callback(header, footer, demarcator, flows, session); - session->write(flowFile, &callback); - session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), getMergedContentType()); + session->write(merge_flow, &callback); + session->putAttribute(merge_flow, FlowAttributeKey(MIME_TYPE), getMergedContentType()); std::string fileName; if (flows.size() == 1) { flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName); @@ -312,18 +342,16 @@ std::shared_ptr<core::FlowFile> BinaryConcatenationMerge::merge(core::ProcessCon flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, fileName); } if (!fileName.empty()) - session->putAttribute(flowFile, FlowAttributeKey(FILENAME), fileName); - return flowFile; + session->putAttribute(merge_flow, FlowAttributeKey(FILENAME), fileName); } -std::shared_ptr<core::FlowFile> TarMerge::merge(core::ProcessContext *context, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header, - std::string &footer, std::string &demarcator) { - std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); +void TarMerge::merge(core::ProcessContext*, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string&, + std::string&, std::string&, const std::shared_ptr<core::FlowFile> &merge_flow) { ArchiveMerge::WriteCallback callback(std::string(merge_content_options::MERGE_FORMAT_TAR_VALUE), flows, session); - session->write(flowFile, &callback); - session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), getMergedContentType()); + session->write(merge_flow, &callback); + session->putAttribute(merge_flow, FlowAttributeKey(MIME_TYPE), getMergedContentType()); std::string fileName; - flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName); + merge_flow->getAttribute(FlowAttributeKey(FILENAME), fileName); if (flows.size() == 1) { flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName); } else { @@ -331,19 +359,17 @@ std::shared_ptr<core::FlowFile> TarMerge::merge(core::ProcessContext *context, c } if (!fileName.empty()) { fileName += ".tar"; - session->putAttribute(flowFile, FlowAttributeKey(FILENAME), fileName); + session->putAttribute(merge_flow, FlowAttributeKey(FILENAME), fileName); } - return flowFile; } -std::shared_ptr<core::FlowFile> ZipMerge::merge(core::ProcessContext *context, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header, - std::string &footer, std::string &demarcator) { - std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); +void ZipMerge::merge(core::ProcessContext*, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string&, + std::string&, std::string&, const std::shared_ptr<core::FlowFile> &merge_flow) { ArchiveMerge::WriteCallback callback(std::string(merge_content_options::MERGE_FORMAT_ZIP_VALUE), flows, session); - session->write(flowFile, &callback); - session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), getMergedContentType()); + session->write(merge_flow, &callback); + session->putAttribute(merge_flow, FlowAttributeKey(MIME_TYPE), getMergedContentType()); std::string fileName; - flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName); + merge_flow->getAttribute(FlowAttributeKey(FILENAME), fileName); if (flows.size() == 1) { flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName); } else { @@ -351,12 +377,11 @@ std::shared_ptr<core::FlowFile> ZipMerge::merge(core::ProcessContext *context, c } if (!fileName.empty()) { fileName += ".zip"; - session->putAttribute(flowFile, FlowAttributeKey(FILENAME), fileName); + session->putAttribute(merge_flow, FlowAttributeKey(FILENAME), fileName); } - return flowFile; } -void AttributeMerger::mergeAttributes(core::ProcessSession *session, std::shared_ptr<core::FlowFile> &merge_flow) { +void AttributeMerger::mergeAttributes(core::ProcessSession *session, const std::shared_ptr<core::FlowFile> &merge_flow) { for (const auto& pair : getMergedAttributes()) { session->putAttribute(merge_flow, pair.first, pair.second); } @@ -372,7 +397,7 @@ std::map<std::string, std::string> AttributeMerger::getMergedAttributes() { return *std::accumulate(std::next(flows_.cbegin()), flows_.cend(), &sum, merge_attributes); } -void KeepOnlyCommonAttributesMerger::processFlowFile(const std::shared_ptr<core::FlowFile> &flow_file, std::map<std::string, std::string>& merged_attributes) { +void KeepOnlyCommonAttributesMerger::processFlowFile(const std::shared_ptr<core::FlowFile> &flow_file, std::map<std::string, std::string> &merged_attributes) { auto flow_attributes = flow_file->getAttributes(); std::map<std::string, std::string> tmp_merged; std::set_intersection(std::make_move_iterator(merged_attributes.begin()), std::make_move_iterator(merged_attributes.end()), @@ -380,7 +405,7 @@ void KeepOnlyCommonAttributesMerger::processFlowFile(const std::shared_ptr<core: merged_attributes = std::move(tmp_merged); } -void KeepAllUniqueAttributesMerger::processFlowFile(const std::shared_ptr<core::FlowFile> &flow_file, std::map<std::string, std::string>& merged_attributes) { +void KeepAllUniqueAttributesMerger::processFlowFile(const std::shared_ptr<core::FlowFile> &flow_file, std::map<std::string, std::string> &merged_attributes) { auto flow_attributes = flow_file->getAttributes(); for (auto&& attr : flow_attributes) { if(std::find(removed_attributes_.cbegin(), removed_attributes_.cend(), attr.first) != removed_attributes_.cend()) { diff --git a/extensions/libarchive/MergeContent.h b/extensions/libarchive/MergeContent.h index 32475a1..80ef0f7 100644 --- a/extensions/libarchive/MergeContent.h +++ b/extensions/libarchive/MergeContent.h @@ -55,11 +55,11 @@ class MergeBin { public: virtual ~MergeBin() = default; - virtual std::string getMergedContentType() = 0; // merge the flows in the bin - virtual std::shared_ptr<core::FlowFile> merge(core::ProcessContext *context, core::ProcessSession *session, - std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header, std::string &footer, std::string &demarcator) = 0; + virtual void merge(core::ProcessContext *context, core::ProcessSession *session, + std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header, std::string &footer, std::string &demarcator, + const std::shared_ptr<core::FlowFile> &flowFile) = 0; }; // BinaryConcatenationMerge Class @@ -69,8 +69,9 @@ public: std::string getMergedContentType() { return mimeType; } - std::shared_ptr<core::FlowFile> merge(core::ProcessContext *context, core::ProcessSession *session, - std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header, std::string &footer, std::string &demarcator); + virtual void merge( + core::ProcessContext *context, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> &flows, + std::string &header, std::string &footer, std::string &demarcator, const std::shared_ptr<core::FlowFile> &flowFile) override; // Nest Callback Class for read stream class ReadCallback : public InputStreamCallback { public: @@ -247,8 +248,8 @@ public: class TarMerge: public ArchiveMerge, public MergeBin { public: static const char *mimeType; - std::shared_ptr<core::FlowFile> merge(core::ProcessContext *context, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header, std::string &footer, - std::string &demarcator); + void merge(core::ProcessContext *context, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header, std::string &footer, + std::string &demarcator, const std::shared_ptr<core::FlowFile> &flowFile) override; std::string getMergedContentType() { return mimeType; } @@ -258,8 +259,8 @@ public: class ZipMerge: public ArchiveMerge, public MergeBin { public: static const char *mimeType; - std::shared_ptr<core::FlowFile> merge(core::ProcessContext *context, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header, std::string &footer, - std::string &demarcator); + void merge(core::ProcessContext *context, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header, std::string &footer, + std::string &demarcator, const std::shared_ptr<core::FlowFile> &flowFile) override; std::string getMergedContentType() { return mimeType; } @@ -269,11 +270,11 @@ class AttributeMerger { public: explicit AttributeMerger(std::deque<std::shared_ptr<org::apache::nifi::minifi::core::FlowFile>> &flows) : flows_(flows) {} - void mergeAttributes(core::ProcessSession *session, std::shared_ptr<core::FlowFile> &merge_flow); + void mergeAttributes(core::ProcessSession *session, const std::shared_ptr<core::FlowFile> &merge_flow); virtual ~AttributeMerger() = default; protected: std::map<std::string, std::string> getMergedAttributes(); - virtual void processFlowFile(const std::shared_ptr<core::FlowFile> &flow_file, std::map<std::string, std::string>& merged_attributes) = 0; + virtual void processFlowFile(const std::shared_ptr<core::FlowFile> &flow_file, std::map<std::string, std::string> &merged_attributes) = 0; const std::deque<std::shared_ptr<core::FlowFile>> &flows_; }; @@ -283,7 +284,7 @@ public: explicit KeepOnlyCommonAttributesMerger(std::deque<std::shared_ptr<org::apache::nifi::minifi::core::FlowFile>> &flows) : AttributeMerger(flows) {} protected: - void processFlowFile(const std::shared_ptr<core::FlowFile> &flow_file, std::map<std::string, std::string>& merged_attributes) override; + void processFlowFile(const std::shared_ptr<core::FlowFile> &flow_file, std::map<std::string, std::string> &merged_attributes) override; }; class KeepAllUniqueAttributesMerger: public AttributeMerger { @@ -291,7 +292,7 @@ public: explicit KeepAllUniqueAttributesMerger(std::deque<std::shared_ptr<org::apache::nifi::minifi::core::FlowFile>> &flows) : AttributeMerger(flows) {} protected: - void processFlowFile(const std::shared_ptr<core::FlowFile> &flow_file, std::map<std::string, std::string>& merged_attributes) override; + void processFlowFile(const std::shared_ptr<core::FlowFile> &flow_file, std::map<std::string, std::string> &merged_attributes) override; private: std::vector<std::string> removed_attributes_; @@ -352,6 +353,8 @@ public: bool checkDefragment(std::unique_ptr<Bin> &bin); private: + void validatePropertyOptions(); + std::shared_ptr<logging::Logger> logger_; std::string mergeStrategy_; std::string mergeFormat_; diff --git a/libminifi/test/archive-tests/MergeFileTests.cpp b/libminifi/test/archive-tests/MergeFileTests.cpp index 6c1ec65..370987f 100644 --- a/libminifi/test/archive-tests/MergeFileTests.cpp +++ b/libminifi/test/archive-tests/MergeFileTests.cpp @@ -844,7 +844,7 @@ TEST_CASE("Test Merge File Attributes Keeping Only Common Attributes", "[testMer } } - context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_TAR_VALUE); context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_DEFRAGMENT); context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT); @@ -859,6 +859,7 @@ TEST_CASE("Test Merge File Attributes Keeping Only Common Attributes", "[testMer flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0)); flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i)); flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3)); + flow->setAttribute("mime.type", "application/octet-stream"); if (i == 1) flow->setAttribute("tagUnique1", "unique1"); else if (i == 2) @@ -892,6 +893,7 @@ TEST_CASE("Test Merge File Attributes Keeping Only Common Attributes", "[testMer REQUIRE(attributes.find("tagUnique1") == attributes.end()); REQUIRE(attributes.find("tagUnique2") == attributes.end()); REQUIRE(attributes["tagCommon"] == "common"); + REQUIRE(attributes["mime.type"] == "application/tar"); LogTestController::getInstance().reset(); } @@ -917,7 +919,7 @@ TEST_CASE("Test Merge File Attributes Keeping All Unique Attributes", "[testMerg } } - context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_TAR_VALUE); context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_DEFRAGMENT); context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT); context->setProperty(org::apache::nifi::minifi::processors::MergeContent::AttributeStrategy, org::apache::nifi::minifi::processors::merge_content_options::ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE); @@ -933,6 +935,7 @@ TEST_CASE("Test Merge File Attributes Keeping All Unique Attributes", "[testMerg flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0)); flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i)); flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3)); + flow->setAttribute("mime.type", "application/octet-stream"); if (i == 1) flow->setAttribute("tagUnique1", "unique1"); else if (i == 2) @@ -966,6 +969,7 @@ TEST_CASE("Test Merge File Attributes Keeping All Unique Attributes", "[testMerg REQUIRE(attributes["tagUnique1"] == "unique1"); REQUIRE(attributes["tagUnique2"] == "unique2"); REQUIRE(attributes["tagCommon"] == "common"); + REQUIRE(attributes["mime.type"] == "application/tar"); LogTestController::getInstance().reset(); }
