This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 365523f59fcb3a6daf9b226bd6d5f79192fd7b2f
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Mon Aug 31 16:47:38 2020 +0200

    MINIFICPP-1321 Do not override MergeContent output attributes
    
    This commit handles the case when flowfiles processed by MergeContent
    could have common attributes, with the same value (like mime type)
    which after the attribute merge would override the individually set
    output attribute.
    
    Signed-off-by: Arpad Boda <[email protected]>
    
    This closes #888
---
 extensions/libarchive/MergeContent.cpp          | 107 +++++++++++++++---------
 extensions/libarchive/MergeContent.h            |  29 ++++---
 libminifi/test/archive-tests/MergeFileTests.cpp |   8 +-
 3 files changed, 88 insertions(+), 56 deletions(-)

diff --git a/extensions/libarchive/MergeContent.cpp 
b/extensions/libarchive/MergeContent.cpp
index 25a32d1..c0761e5 100644
--- a/extensions/libarchive/MergeContent.cpp
+++ b/extensions/libarchive/MergeContent.cpp
@@ -147,6 +147,9 @@ void MergeContent::onSchedule(core::ProcessContext 
*context, core::ProcessSessio
   if (context->getProperty(AttributeStrategy.getName(), value) && 
!value.empty()) {
     attributeStrategy_ = value;
   }
+
+  validatePropertyOptions();
+
   if (mergeStrategy_ == merge_content_options::MERGE_STRATEGY_DEFRAGMENT) {
     binManager_.setFileCount(FRAGMENT_COUNT_ATTRIBUTE);
   }
@@ -170,7 +173,34 @@ void MergeContent::onSchedule(core::ProcessContext 
*context, core::ProcessSessio
   }
 }
 
-std::string MergeContent::getGroupId(core::ProcessContext *context, 
std::shared_ptr<core::FlowFile> flow) {
+void MergeContent::validatePropertyOptions() {
+  if (mergeStrategy_ != merge_content_options::MERGE_STRATEGY_DEFRAGMENT &&
+      mergeStrategy_ != merge_content_options::MERGE_STRATEGY_BIN_PACK) {
+    logger_->log_error("Merge strategy not supported %s", mergeStrategy_);
+    throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Invalid merge 
strategy: " + attributeStrategy_);
+  }
+
+  if (mergeFormat_ != merge_content_options::MERGE_FORMAT_CONCAT_VALUE &&
+      mergeFormat_ != merge_content_options::MERGE_FORMAT_TAR_VALUE &&
+      mergeFormat_ != merge_content_options::MERGE_FORMAT_ZIP_VALUE) {
+    logger_->log_error("Merge format not supported %s", mergeFormat_);
+    throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Invalid merge 
format: " + mergeFormat_);
+  }
+
+  if (delimiterStrategy_ != merge_content_options::DELIMITER_STRATEGY_FILENAME 
&&
+      delimiterStrategy_ != merge_content_options::DELIMITER_STRATEGY_TEXT) {
+    logger_->log_error("Delimiter strategy not supported %s", 
delimiterStrategy_);
+    throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Invalid 
delimiter strategy: " + delimiterStrategy_);
+  }
+
+  if (attributeStrategy_ != 
merge_content_options::ATTRIBUTE_STRATEGY_KEEP_COMMON &&
+      attributeStrategy_ != 
merge_content_options::ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE) {
+    logger_->log_error("Attribute strategy not supported %s", 
attributeStrategy_);
+    throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Invalid 
attribute strategy: " + attributeStrategy_);
+  }
+}
+
+std::string MergeContent::getGroupId(core::ProcessContext*, 
std::shared_ptr<core::FlowFile> flow) {
   std::string groupId = "";
   std::string value;
   if (!correlationAttributeName_.empty()) {
@@ -258,6 +288,16 @@ bool MergeContent::processBin(core::ProcessContext 
*context, core::ProcessSessio
         });
   }
 
+  std::shared_ptr<core::FlowFile> merge_flow = 
std::static_pointer_cast<FlowFileRecord>(session->create());
+  if (attributeStrategy_ == 
merge_content_options::ATTRIBUTE_STRATEGY_KEEP_COMMON)
+    
KeepOnlyCommonAttributesMerger(bin->getFlowFile()).mergeAttributes(session, 
merge_flow);
+  else if (attributeStrategy_ == 
merge_content_options::ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE)
+    KeepAllUniqueAttributesMerger(bin->getFlowFile()).mergeAttributes(session, 
merge_flow);
+  else {
+    logger_->log_error("Attribute strategy not supported %s", 
attributeStrategy_);
+    return false;
+  }
+
   std::unique_ptr<MergeBin> mergeBin;
   if (mergeFormat_ == merge_content_options::MERGE_FORMAT_CONCAT_VALUE)
     mergeBin = utils::make_unique<BinaryConcatenationMerge>();
@@ -270,41 +310,31 @@ bool MergeContent::processBin(core::ProcessContext 
*context, core::ProcessSessio
     return false;
   }
 
-  std::shared_ptr<core::FlowFile> mergeFlow;
   try {
-    mergeFlow = mergeBin->merge(context, session, bin->getFlowFile(), 
headerContent_, footerContent_, demarcatorContent_);
+    mergeBin->merge(context, session, bin->getFlowFile(), headerContent_, 
footerContent_, demarcatorContent_, merge_flow);
   } catch (...) {
     logger_->log_error("Merge Content merge catch exception");
     return false;
   }
-  session->putAttribute(mergeFlow, BinFiles::FRAGMENT_COUNT_ATTRIBUTE, 
std::to_string(bin->getSize()));
-
-  if (attributeStrategy_ == 
merge_content_options::ATTRIBUTE_STRATEGY_KEEP_COMMON)
-    
KeepOnlyCommonAttributesMerger(bin->getFlowFile()).mergeAttributes(session, 
mergeFlow);
-  else if (attributeStrategy_ == 
merge_content_options::ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE)
-    KeepAllUniqueAttributesMerger(bin->getFlowFile()).mergeAttributes(session, 
mergeFlow);
-  else {
-    logger_->log_error("Attribute strategy not supported %s", 
attributeStrategy_);
-    throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Invalid 
attribute strategy: " + attributeStrategy_);
-  }
+  session->putAttribute(merge_flow, BinFiles::FRAGMENT_COUNT_ATTRIBUTE, 
std::to_string(bin->getSize()));
 
   // we successfully merge the flow
-  session->transfer(mergeFlow, Merge);
+  session->transfer(merge_flow, Merge);
   std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile();
   for (auto flow : flows) {
     session->transfer(flow, Original);
   }
-  logger_->log_info("Merge FlowFile record UUID %s, payload length %d", 
mergeFlow->getUUIDStr(), mergeFlow->getSize());
+  logger_->log_info("Merge FlowFile record UUID %s, payload length %d", 
merge_flow->getUUIDStr(), merge_flow->getSize());
 
   return true;
 }
 
-std::shared_ptr<core::FlowFile> 
BinaryConcatenationMerge::merge(core::ProcessContext *context, 
core::ProcessSession *session,
-        std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string 
&header, std::string &footer, std::string &demarcator) {
-  std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < 
FlowFileRecord > (session->create());
+void BinaryConcatenationMerge::merge(core::ProcessContext*, 
core::ProcessSession *session,
+    std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header, 
std::string &footer, std::string &demarcator,
+    const std::shared_ptr<core::FlowFile> &merge_flow) {
   BinaryConcatenationMerge::WriteCallback callback(header, footer, demarcator, 
flows, session);
-  session->write(flowFile, &callback);
-  session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), 
getMergedContentType());
+  session->write(merge_flow, &callback);
+  session->putAttribute(merge_flow, FlowAttributeKey(MIME_TYPE), 
getMergedContentType());
   std::string fileName;
   if (flows.size() == 1) {
     flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName);
@@ -312,18 +342,16 @@ std::shared_ptr<core::FlowFile> 
BinaryConcatenationMerge::merge(core::ProcessCon
     flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, fileName);
   }
   if (!fileName.empty())
-    session->putAttribute(flowFile, FlowAttributeKey(FILENAME), fileName);
-  return flowFile;
+    session->putAttribute(merge_flow, FlowAttributeKey(FILENAME), fileName);
 }
 
-std::shared_ptr<core::FlowFile> TarMerge::merge(core::ProcessContext *context, 
core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> 
&flows, std::string &header,
-    std::string &footer, std::string &demarcator) {
-  std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < 
FlowFileRecord > (session->create());
+void TarMerge::merge(core::ProcessContext*, core::ProcessSession *session, 
std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string&,
+    std::string&, std::string&, const std::shared_ptr<core::FlowFile> 
&merge_flow) {
   ArchiveMerge::WriteCallback 
callback(std::string(merge_content_options::MERGE_FORMAT_TAR_VALUE), flows, 
session);
-  session->write(flowFile, &callback);
-  session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), 
getMergedContentType());
+  session->write(merge_flow, &callback);
+  session->putAttribute(merge_flow, FlowAttributeKey(MIME_TYPE), 
getMergedContentType());
   std::string fileName;
-  flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName);
+  merge_flow->getAttribute(FlowAttributeKey(FILENAME), fileName);
   if (flows.size() == 1) {
     flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName);
   } else {
@@ -331,19 +359,17 @@ std::shared_ptr<core::FlowFile> 
TarMerge::merge(core::ProcessContext *context, c
   }
   if (!fileName.empty()) {
     fileName += ".tar";
-    session->putAttribute(flowFile, FlowAttributeKey(FILENAME), fileName);
+    session->putAttribute(merge_flow, FlowAttributeKey(FILENAME), fileName);
   }
-  return flowFile;
 }
 
-std::shared_ptr<core::FlowFile> ZipMerge::merge(core::ProcessContext *context, 
core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> 
&flows, std::string &header,
-    std::string &footer, std::string &demarcator) {
-  std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < 
FlowFileRecord > (session->create());
+void ZipMerge::merge(core::ProcessContext*, core::ProcessSession *session, 
std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string&,
+    std::string&, std::string&, const std::shared_ptr<core::FlowFile> 
&merge_flow) {
   ArchiveMerge::WriteCallback 
callback(std::string(merge_content_options::MERGE_FORMAT_ZIP_VALUE), flows, 
session);
-  session->write(flowFile, &callback);
-  session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), 
getMergedContentType());
+  session->write(merge_flow, &callback);
+  session->putAttribute(merge_flow, FlowAttributeKey(MIME_TYPE), 
getMergedContentType());
   std::string fileName;
-  flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName);
+  merge_flow->getAttribute(FlowAttributeKey(FILENAME), fileName);
   if (flows.size() == 1) {
     flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName);
   } else {
@@ -351,12 +377,11 @@ std::shared_ptr<core::FlowFile> 
ZipMerge::merge(core::ProcessContext *context, c
   }
   if (!fileName.empty()) {
     fileName += ".zip";
-    session->putAttribute(flowFile, FlowAttributeKey(FILENAME), fileName);
+    session->putAttribute(merge_flow, FlowAttributeKey(FILENAME), fileName);
   }
-  return flowFile;
 }
 
-void AttributeMerger::mergeAttributes(core::ProcessSession *session, 
std::shared_ptr<core::FlowFile> &merge_flow) {
+void AttributeMerger::mergeAttributes(core::ProcessSession *session, const 
std::shared_ptr<core::FlowFile> &merge_flow) {
   for (const auto& pair : getMergedAttributes()) {
     session->putAttribute(merge_flow, pair.first, pair.second);
   }
@@ -372,7 +397,7 @@ std::map<std::string, std::string> 
AttributeMerger::getMergedAttributes() {
   return *std::accumulate(std::next(flows_.cbegin()), flows_.cend(), &sum, 
merge_attributes);
 }
 
-void KeepOnlyCommonAttributesMerger::processFlowFile(const 
std::shared_ptr<core::FlowFile> &flow_file, std::map<std::string, std::string>& 
merged_attributes) {
+void KeepOnlyCommonAttributesMerger::processFlowFile(const 
std::shared_ptr<core::FlowFile> &flow_file, std::map<std::string, std::string> 
&merged_attributes) {
   auto flow_attributes = flow_file->getAttributes();
   std::map<std::string, std::string> tmp_merged;
   std::set_intersection(std::make_move_iterator(merged_attributes.begin()), 
std::make_move_iterator(merged_attributes.end()),
@@ -380,7 +405,7 @@ void KeepOnlyCommonAttributesMerger::processFlowFile(const 
std::shared_ptr<core:
   merged_attributes = std::move(tmp_merged);
 }
 
-void KeepAllUniqueAttributesMerger::processFlowFile(const 
std::shared_ptr<core::FlowFile> &flow_file, std::map<std::string, std::string>& 
merged_attributes) {
+void KeepAllUniqueAttributesMerger::processFlowFile(const 
std::shared_ptr<core::FlowFile> &flow_file, std::map<std::string, std::string> 
&merged_attributes) {
   auto flow_attributes = flow_file->getAttributes();
   for (auto&& attr : flow_attributes) {
     if(std::find(removed_attributes_.cbegin(), removed_attributes_.cend(), 
attr.first) != removed_attributes_.cend()) {
diff --git a/extensions/libarchive/MergeContent.h 
b/extensions/libarchive/MergeContent.h
index 32475a1..80ef0f7 100644
--- a/extensions/libarchive/MergeContent.h
+++ b/extensions/libarchive/MergeContent.h
@@ -55,11 +55,11 @@ class MergeBin {
 public:
 
   virtual ~MergeBin() = default;
-
   virtual std::string getMergedContentType() = 0;
   // merge the flows in the bin
-  virtual std::shared_ptr<core::FlowFile> merge(core::ProcessContext *context, 
core::ProcessSession *session,
-      std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header, 
std::string &footer, std::string &demarcator) = 0;
+  virtual void merge(core::ProcessContext *context, core::ProcessSession 
*session,
+      std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header, 
std::string &footer, std::string &demarcator,
+      const std::shared_ptr<core::FlowFile> &flowFile) = 0;
 };
 
 // BinaryConcatenationMerge Class
@@ -69,8 +69,9 @@ public:
   std::string getMergedContentType() {
     return mimeType;
   }
-  std::shared_ptr<core::FlowFile> merge(core::ProcessContext *context, 
core::ProcessSession *session,
-          std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string 
&header, std::string &footer, std::string &demarcator);
+  virtual void merge(
+    core::ProcessContext *context, core::ProcessSession *session, 
std::deque<std::shared_ptr<core::FlowFile>> &flows,
+    std::string &header, std::string &footer, std::string &demarcator, const 
std::shared_ptr<core::FlowFile> &flowFile) override;
   // Nest Callback Class for read stream
   class ReadCallback : public InputStreamCallback {
    public:
@@ -247,8 +248,8 @@ public:
 class TarMerge: public ArchiveMerge, public MergeBin {
 public:
   static const char *mimeType;
-  std::shared_ptr<core::FlowFile> merge(core::ProcessContext *context, 
core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> 
&flows, std::string &header, std::string &footer,
-        std::string &demarcator);
+  void merge(core::ProcessContext *context, core::ProcessSession *session, 
std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header, 
std::string &footer,
+    std::string &demarcator, const std::shared_ptr<core::FlowFile> &flowFile) 
override;
   std::string getMergedContentType() {
     return mimeType;
   }
@@ -258,8 +259,8 @@ public:
 class ZipMerge: public ArchiveMerge, public MergeBin {
 public:
   static const char *mimeType;
-  std::shared_ptr<core::FlowFile> merge(core::ProcessContext *context, 
core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> 
&flows, std::string &header, std::string &footer,
-        std::string &demarcator);
+  void merge(core::ProcessContext *context, core::ProcessSession *session, 
std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header, 
std::string &footer,
+    std::string &demarcator, const std::shared_ptr<core::FlowFile> &flowFile) 
override;
   std::string getMergedContentType() {
     return mimeType;
   }
@@ -269,11 +270,11 @@ class AttributeMerger {
 public:
   explicit 
AttributeMerger(std::deque<std::shared_ptr<org::apache::nifi::minifi::core::FlowFile>>
 &flows)
     : flows_(flows) {}
-  void mergeAttributes(core::ProcessSession *session, 
std::shared_ptr<core::FlowFile> &merge_flow);
+  void mergeAttributes(core::ProcessSession *session, const 
std::shared_ptr<core::FlowFile> &merge_flow);
   virtual ~AttributeMerger() = default;
 protected:
   std::map<std::string, std::string> getMergedAttributes();
-  virtual void processFlowFile(const std::shared_ptr<core::FlowFile> 
&flow_file, std::map<std::string, std::string>& merged_attributes) = 0;
+  virtual void processFlowFile(const std::shared_ptr<core::FlowFile> 
&flow_file, std::map<std::string, std::string> &merged_attributes) = 0;
 
   const std::deque<std::shared_ptr<core::FlowFile>> &flows_;
 };
@@ -283,7 +284,7 @@ public:
   explicit 
KeepOnlyCommonAttributesMerger(std::deque<std::shared_ptr<org::apache::nifi::minifi::core::FlowFile>>
 &flows)
     : AttributeMerger(flows) {}
 protected:
-  void processFlowFile(const std::shared_ptr<core::FlowFile> &flow_file, 
std::map<std::string, std::string>& merged_attributes) override;
+  void processFlowFile(const std::shared_ptr<core::FlowFile> &flow_file, 
std::map<std::string, std::string> &merged_attributes) override;
 };
 
 class KeepAllUniqueAttributesMerger: public AttributeMerger {
@@ -291,7 +292,7 @@ public:
   explicit 
KeepAllUniqueAttributesMerger(std::deque<std::shared_ptr<org::apache::nifi::minifi::core::FlowFile>>
 &flows)
     : AttributeMerger(flows) {}
 protected:
-  void processFlowFile(const std::shared_ptr<core::FlowFile> &flow_file, 
std::map<std::string, std::string>& merged_attributes) override;
+  void processFlowFile(const std::shared_ptr<core::FlowFile> &flow_file, 
std::map<std::string, std::string> &merged_attributes) override;
 
 private:
   std::vector<std::string> removed_attributes_;
@@ -352,6 +353,8 @@ public:
   bool checkDefragment(std::unique_ptr<Bin> &bin);
 
  private:
+  void validatePropertyOptions();
+
   std::shared_ptr<logging::Logger> logger_;
   std::string mergeStrategy_;
   std::string mergeFormat_;
diff --git a/libminifi/test/archive-tests/MergeFileTests.cpp 
b/libminifi/test/archive-tests/MergeFileTests.cpp
index 6c1ec65..370987f 100644
--- a/libminifi/test/archive-tests/MergeFileTests.cpp
+++ b/libminifi/test/archive-tests/MergeFileTests.cpp
@@ -844,7 +844,7 @@ TEST_CASE("Test Merge File Attributes Keeping Only Common 
Attributes", "[testMer
     }
   }
 
-  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat,
 
org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat,
 
org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_TAR_VALUE);
   
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy,
 
org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_DEFRAGMENT);
   
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy,
 
org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
 
@@ -859,6 +859,7 @@ TEST_CASE("Test Merge File Attributes Keeping Only Common 
Attributes", "[testMer
     flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, 
std::to_string(0));
     flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, 
std::to_string(i));
     flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, 
std::to_string(3));
+    flow->setAttribute("mime.type", "application/octet-stream");
     if (i == 1)
       flow->setAttribute("tagUnique1", "unique1");
     else if (i == 2)
@@ -892,6 +893,7 @@ TEST_CASE("Test Merge File Attributes Keeping Only Common 
Attributes", "[testMer
   REQUIRE(attributes.find("tagUnique1") == attributes.end());
   REQUIRE(attributes.find("tagUnique2") == attributes.end());
   REQUIRE(attributes["tagCommon"] == "common");
+  REQUIRE(attributes["mime.type"] == "application/tar");
 
   LogTestController::getInstance().reset();
 }
@@ -917,7 +919,7 @@ TEST_CASE("Test Merge File Attributes Keeping All Unique 
Attributes", "[testMerg
     }
   }
 
-  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat,
 
org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat,
 
org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_TAR_VALUE);
   
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy,
 
org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_DEFRAGMENT);
   
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy,
 
org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
   
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::AttributeStrategy,
 
org::apache::nifi::minifi::processors::merge_content_options::ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE);
@@ -933,6 +935,7 @@ TEST_CASE("Test Merge File Attributes Keeping All Unique 
Attributes", "[testMerg
     flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, 
std::to_string(0));
     flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, 
std::to_string(i));
     flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, 
std::to_string(3));
+    flow->setAttribute("mime.type", "application/octet-stream");
     if (i == 1)
       flow->setAttribute("tagUnique1", "unique1");
     else if (i == 2)
@@ -966,6 +969,7 @@ TEST_CASE("Test Merge File Attributes Keeping All Unique 
Attributes", "[testMerg
   REQUIRE(attributes["tagUnique1"] == "unique1");
   REQUIRE(attributes["tagUnique2"] == "unique2");
   REQUIRE(attributes["tagCommon"] == "common");
+  REQUIRE(attributes["mime.type"] == "application/tar");
 
   LogTestController::getInstance().reset();
 }

Reply via email to