arpadboda commented on a change in pull request #792:
URL: https://github.com/apache/nifi-minifi-cpp/pull/792#discussion_r429677443



##########
File path: libminifi/test/archive-tests/MergeFileTests.cpp
##########
@@ -37,88 +37,112 @@
 #include "../TestBase.h"
 #include "../unit/ProvenanceTestHelper.h"
 
-static const char* FLOW_FILE = "/tmp/minifi-mergecontent";
-static const char* EXPECT_MERGE_CONTENT_FIRST = 
"/tmp/minifi-expect-mergecontent1.txt";
-static const char* EXPECT_MERGE_CONTENT_SECOND = 
"/tmp/minifi-expect-mergecontent2.txt";
-static const char* HEADER_FILE = "/tmp/minifi-mergecontent.header";
-static const char* FOOTER_FILE = "/tmp/minifi-mergecontent.footer";
-static const char* DEMARCATOR_FILE = "/tmp/minifi-mergecontent.demarcator";
-
-class ReadCallback: public org::apache::nifi::minifi::InputStreamCallback {
+std::string FLOW_FILE;
+std::string EXPECT_MERGE_CONTENT_FIRST;
+std::string EXPECT_MERGE_CONTENT_SECOND;
+std::string HEADER_FILE;
+std::string FOOTER_FILE;
+std::string DEMARCATOR_FILE;
+
+void init_file_paths() {
+  struct Initializer {
+    Initializer() {
+      static TestController global_controller;
+      char format[] = "/tmp/test.XXXXXX";
+      std::string tempDir = global_controller.createTempDirectory(format);
+      FLOW_FILE = utils::file::FileUtils::concat_path(tempDir, 
"minifi-mergecontent");
+      EXPECT_MERGE_CONTENT_FIRST = 
utils::file::FileUtils::concat_path(tempDir, "minifi-expect-mergecontent1.txt");
+      EXPECT_MERGE_CONTENT_SECOND = 
utils::file::FileUtils::concat_path(tempDir, "minifi-expect-mergecontent2.txt");
+      HEADER_FILE = utils::file::FileUtils::concat_path(tempDir, 
"minifi-mergecontent.header");
+      FOOTER_FILE = utils::file::FileUtils::concat_path(tempDir, 
"minifi-mergecontent.footer");
+      DEMARCATOR_FILE = utils::file::FileUtils::concat_path(tempDir, 
"minifi-mergecontent.demarcator");
+    }
+  };
+  static Initializer initializer;
+}
+
+class FixedBuffer : public org::apache::nifi::minifi::InputStreamCallback {
  public:
-  explicit ReadCallback(uint64_t size) :
-      read_size_(0) {
-    buffer_size_ = size;
-    buffer_ = new uint8_t[buffer_size_];
-    archive_buffer_num_ = 0;
+  explicit FixedBuffer(std::size_t capacity) : capacity_(capacity) {
+    buf_ = new uint8_t[capacity_];

Review comment:
       Is there any reason that prevent using unique ptr here? 

##########
File path: libminifi/test/archive-tests/MergeFileTests.cpp
##########
@@ -275,785 +312,451 @@ TEST_CASE("MergeFileDefragmentDelimiter", 
"[mergefiletest2]") {
         expectfileSecond << "demarcator";
       std::ofstream tmpfile;
       std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
-      tmpfile.open(flowFileName.c_str());
+      tmpfile.open(flowFileName.c_str(), std::ios::binary);
       for (int j = 0; j < 32; j++) {
         tmpfile << std::to_string(i);
         if (i < 3)
           expectfileFirst << std::to_string(i);
         else
           expectfileSecond << std::to_string(i);
       }
-      tmpfile.close();
     }
     expectfileFirst << "footer";
     expectfileSecond << "footer";
-    expectfileFirst.close();
-    expectfileSecond.close();
-
-    TestController testController;
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<core::ProcessSession>();
-    
LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
-    std::shared_ptr<core::Processor> processor = 
std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = 
std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
-    std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
-    
content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    // connection from merge processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = 
std::make_shared<minifi::Connection>(repo, content_repo, 
"logattributeconnection");
-    connection->addRelationship(core::Relationship("merged", "Merge successful 
output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
-    // connection to merge processor
-    std::shared_ptr<minifi::Connection> mergeconnection = 
std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection");
-    mergeconnection->setDestination(processor);
-    mergeconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(mergeconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship original("original", "");
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(original);
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+  }
 
-    std::shared_ptr<core::ProcessorNode> node = 
std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, 
controller_services_provider, repo, repo, content_repo);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat,
 MERGE_FORMAT_CONCAT_VALUE);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy,
 MERGE_STRATEGY_DEFRAGMENT);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey,
 DELIMITER_STRATEGY_FILENAME);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Header,
 "/tmp/minifi-mergecontent.header");
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Footer,
 "/tmp/minifi-mergecontent.footer");
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Demarcator,
 "/tmp/minifi-mergecontent.demarcator");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::FlowFile> record[6];
-
-    // Generate 6 flowfiles, first threes merged to one, second thress merged 
to one
-    std::shared_ptr<core::Connectable> income = 
node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = 
std::static_pointer_cast<minifi::Connection>(income);
-    for (int i = 0; i < 6; i++) {
-      std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < 
core::FlowFile > (sessionGenFlowFile.create());
-      std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
-      sessionGenFlowFile.import(flowFileName, flow, true, 0);
-      // three bundle
-      if (i < 3)
-        flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, 
std::to_string(0));
-      else
-        flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, 
std::to_string(1));
-      if (i < 3)
-        flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, 
std::to_string(i));
-      else
-        flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, 
std::to_string(i-3));
-      flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, 
std::to_string(3));
-      record[i] = flow;
-    }
-    income_connection->put(record[0]);
-    income_connection->put(record[2]);
-    income_connection->put(record[5]);
-    income_connection->put(record[4]);
-    income_connection->put(record[1]);
-    income_connection->put(record[3]);
-
-    REQUIRE(processor->getName() == "mergecontent");
-    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-    processor->onSchedule(context, factory);
-    for (int i = 0; i < 6; i++) {
-      auto session = std::make_shared<core::ProcessSession>(context);
-      processor->onTrigger(context, session);
-      session->commit();
-    }
-    // validate the merge content
-    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
-    std::shared_ptr<core::FlowFile> flow1 = 
connection->poll(expiredFlowRecords);
-    std::shared_ptr<core::FlowFile> flow2 = 
connection->poll(expiredFlowRecords);
-    REQUIRE(flow1->getSize() == 128);
-    {
-      ReadCallback callback(flow1->getSize());
-      sessionGenFlowFile.read(flow1, &callback);
-      std::ifstream file1;
-      file1.open(EXPECT_MERGE_CONTENT_FIRST, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file1)), 
std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), 
callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file1.close();
-    }
-    REQUIRE(flow2->getSize() == 128);
-    {
-      ReadCallback callback(flow2->getSize());
-      sessionGenFlowFile.read(flow2, &callback);
-      std::ifstream file2;
-      file2.open(EXPECT_MERGE_CONTENT_SECOND, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file2)), 
std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), 
callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file2.close();
-    }
-    LogTestController::getInstance().reset();
-    for (int i = 0; i < 6; i++) {
-      std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
-      unlink(flowFileName.c_str());
-    }
-    unlink(EXPECT_MERGE_CONTENT_FIRST);
-    unlink(EXPECT_MERGE_CONTENT_SECOND);
-    unlink(FOOTER_FILE);
-    unlink(HEADER_FILE);
-    unlink(DEMARCATOR_FILE);
-  } catch (...) {
+  MergeTestController testController;
+  auto context = testController.context;
+  auto processor = testController.processor;
+  auto input = testController.input;
+  auto output = testController.output;
+
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat,
 MERGE_FORMAT_CONCAT_VALUE);
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy,
 MERGE_STRATEGY_DEFRAGMENT);
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey,
 DELIMITER_STRATEGY_FILENAME);
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Header,
 HEADER_FILE);
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Footer,
 FOOTER_FILE);
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Demarcator,
 DEMARCATOR_FILE);
+
+  core::ProcessSession sessionGenFlowFile(context);
+  std::shared_ptr<core::FlowFile> record[6];
+
+  // Generate 6 flowfiles, first threes merged to one, second thress merged to 
one
+  for (int i = 0; i < 6; i++) {
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < 
core::FlowFile > (sessionGenFlowFile.create());
+    std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
+    sessionGenFlowFile.import(flowFileName, flow, true, 0);
+    // three bundle
+    if (i < 3)
+      flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, 
std::to_string(0));
+    else
+      flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, 
std::to_string(1));
+    if (i < 3)
+      flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, 
std::to_string(i));
+    else
+      flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, 
std::to_string(i-3));
+    flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, 
std::to_string(3));
+    record[i] = flow;
+  }
+  input->put(record[0]);
+  input->put(record[2]);
+  input->put(record[5]);
+  input->put(record[4]);
+  input->put(record[1]);
+  input->put(record[3]);
+
+  REQUIRE(processor->getName() == "mergecontent");
+  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+  processor->onSchedule(context, factory);
+  for (int i = 0; i < 6; i++) {
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+    session->commit();
+  }
+  // validate the merge content
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+  std::shared_ptr<core::FlowFile> flow2 = output->poll(expiredFlowRecords);
+  REQUIRE(flow1->getSize() == 128);
+  {
+    FixedBuffer callback(flow1->getSize());
+    sessionGenFlowFile.read(flow1, &callback);
+    std::ifstream file1(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
+    std::string contents((std::istreambuf_iterator<char>(file1)), 
std::istreambuf_iterator<char>());
+    REQUIRE(callback.to_string() == contents);
   }
+  REQUIRE(flow2->getSize() == 128);
+  {
+    FixedBuffer callback(flow2->getSize());
+    sessionGenFlowFile.read(flow2, &callback);
+    std::ifstream file2(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
+    std::string contents((std::istreambuf_iterator<char>(file2)), 
std::istreambuf_iterator<char>());
+    REQUIRE(callback.to_string() == contents);
+  }
+  LogTestController::getInstance().reset();
 }
 
 TEST_CASE("MergeFileDefragmentDropFlow", "[mergefiletest3]") {
-  try {
-    std::ofstream expectfileFirst;
-    std::ofstream expectfileSecond;
-
-    expectfileFirst.open(EXPECT_MERGE_CONTENT_FIRST);
-    expectfileSecond.open(EXPECT_MERGE_CONTENT_SECOND);
+  {
+    std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, 
std::ios::binary);
+    std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, 
std::ios::binary);
 
     // Create and write to the test file, drop record 4
     for (int i = 0; i < 6; i++) {
       if (i == 4)
         continue;
       std::ofstream tmpfile;
       std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
-      tmpfile.open(flowFileName.c_str());
+      tmpfile.open(flowFileName.c_str(), std::ios::binary);
       for (int j = 0; j < 32; j++) {
         tmpfile << std::to_string(i);
         if (i < 3)
           expectfileFirst << std::to_string(i);
         else
           expectfileSecond << std::to_string(i);
       }
-      tmpfile.close();
     }
-    expectfileFirst.close();
-    expectfileSecond.close();
-
-    TestController testController;
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<core::ProcessSession>();
-    
LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
-    std::shared_ptr<core::Processor> processor = 
std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = 
std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
-    std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
-    
content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    // connection from merge processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = 
std::make_shared<minifi::Connection>(repo, content_repo, 
"logattributeconnection");
-    connection->addRelationship(core::Relationship("merged", "Merge successful 
output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
-    // connection to merge processor
-    std::shared_ptr<minifi::Connection> mergeconnection = 
std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection");
-    mergeconnection->setDestination(processor);
-    mergeconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(mergeconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship original("original", "");
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(original);
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
-
-    std::shared_ptr<core::ProcessorNode> node = 
std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, 
controller_services_provider, repo, repo, content_repo);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat,
 MERGE_FORMAT_CONCAT_VALUE);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy,
 MERGE_STRATEGY_DEFRAGMENT);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey,
 DELIMITER_STRATEGY_TEXT);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MaxBinAge,
 "1 sec");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::FlowFile> record[6];
+  }
 
-    // Generate 6 flowfiles, first threes merged to one, second thress merged 
to one
-    std::shared_ptr<core::Connectable> income = 
node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = 
std::static_pointer_cast<minifi::Connection>(income);
-    for (int i = 0; i < 6; i++) {
-      if (i == 4)
-        continue;
-      std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < 
core::FlowFile > (sessionGenFlowFile.create());
-      std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
-      sessionGenFlowFile.import(flowFileName, flow, true, 0);
-      // three bundle
-      if (i < 3)
-        flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, 
std::to_string(0));
-      else
-        flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, 
std::to_string(1));
-      if (i < 3)
-        flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, 
std::to_string(i));
-      else
-        flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, 
std::to_string(i-3));
-      flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, 
std::to_string(3));
-      record[i] = flow;
-    }
-    income_connection->put(record[0]);
-    income_connection->put(record[2]);
-    income_connection->put(record[5]);
-    income_connection->put(record[1]);
-    income_connection->put(record[3]);
-
-    REQUIRE(processor->getName() == "mergecontent");
-    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-    processor->onSchedule(context, factory);
-    for (int i = 0; i < 6; i++) {
-      if (i == 4)
-        continue;
-      auto session = std::make_shared<core::ProcessSession>(context);
-      processor->onTrigger(context, session);
-      session->commit();
-    }
-    std::this_thread::sleep_for(std::chrono::milliseconds(2000));
-    {
-      auto session = std::make_shared<core::ProcessSession>(context);
-      processor->onTrigger(context, session);
-    }
-    // validate the merge content
-    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
-    std::shared_ptr<core::FlowFile> flow1 = 
connection->poll(expiredFlowRecords);
-    std::shared_ptr<core::FlowFile> flow2 = 
connection->poll(expiredFlowRecords);
-    REQUIRE(flow1->getSize() == 96);
-    {
-      ReadCallback callback(flow1->getSize());
-      sessionGenFlowFile.read(flow1, &callback);
-      std::ifstream file1;
-      file1.open(EXPECT_MERGE_CONTENT_FIRST, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file1)), 
std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), 
callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file1.close();
-    }
-    REQUIRE(flow2->getSize() == 64);
-    {
-      ReadCallback callback(flow2->getSize());
-      sessionGenFlowFile.read(flow2, &callback);
-      std::ifstream file2;
-      file2.open(EXPECT_MERGE_CONTENT_SECOND, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file2)), 
std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), 
callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file2.close();
-    }
-    LogTestController::getInstance().reset();
-    for (int i = 0; i < 6; i++) {
-      if (i == 4)
-        continue;
-      std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
-      unlink(flowFileName.c_str());
-    }
-    unlink(EXPECT_MERGE_CONTENT_FIRST);
-    unlink(EXPECT_MERGE_CONTENT_SECOND);
-  } catch (...) {
+  MergeTestController testController;
+  auto context = testController.context;
+  auto processor = testController.processor;
+  auto input = testController.input;
+  auto output = testController.output;
+
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat,
 MERGE_FORMAT_CONCAT_VALUE);
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy,
 MERGE_STRATEGY_DEFRAGMENT);
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey,
 DELIMITER_STRATEGY_TEXT);
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MaxBinAge,
 "1 sec");
+
+  core::ProcessSession sessionGenFlowFile(context);
+  std::shared_ptr<core::FlowFile> record[6];
+
+  // Generate 6 flowfiles, first threes merged to one, second thress merged to 
one
+  for (int i = 0; i < 6; i++) {
+    if (i == 4)
+      continue;
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < 
core::FlowFile > (sessionGenFlowFile.create());
+    std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
+    sessionGenFlowFile.import(flowFileName, flow, true, 0);
+    // three bundle
+    if (i < 3)
+      flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, 
std::to_string(0));
+    else
+      flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, 
std::to_string(1));
+    if (i < 3)
+      flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, 
std::to_string(i));
+    else
+      flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, 
std::to_string(i-3));
+    flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, 
std::to_string(3));
+    record[i] = flow;
+  }
+  input->put(record[0]);
+  input->put(record[2]);
+  input->put(record[5]);
+  input->put(record[1]);
+  input->put(record[3]);
+
+  REQUIRE(processor->getName() == "mergecontent");
+  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+  processor->onSchedule(context, factory);
+  for (int i = 0; i < 6; i++) {
+    if (i == 4)
+      continue;
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+    session->commit();
   }
+  std::this_thread::sleep_for(std::chrono::milliseconds(2000));
+  {
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+  }
+  // validate the merge content
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+  std::shared_ptr<core::FlowFile> flow2 = output->poll(expiredFlowRecords);
+  REQUIRE(flow1->getSize() == 96);
+  {
+    FixedBuffer callback(flow1->getSize());
+    sessionGenFlowFile.read(flow1, &callback);
+    std::ifstream file1(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
+    std::string contents((std::istreambuf_iterator<char>(file1)), 
std::istreambuf_iterator<char>());
+    REQUIRE(callback.to_string() == contents);
+  }
+  REQUIRE(flow2->getSize() == 64);
+  {
+    FixedBuffer callback(flow2->getSize());
+    sessionGenFlowFile.read(flow2, &callback);
+    std::ifstream file2(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
+    std::string contents((std::istreambuf_iterator<char>(file2)), 
std::istreambuf_iterator<char>());
+    REQUIRE(callback.to_string() == contents);
+  }
+  LogTestController::getInstance().reset();
 }
 
 TEST_CASE("MergeFileBinPack", "[mergefiletest4]") {
-  try {
-    std::ofstream expectfileFirst;
-    std::ofstream expectfileSecond;
-    expectfileFirst.open(EXPECT_MERGE_CONTENT_FIRST);
-    expectfileSecond.open(EXPECT_MERGE_CONTENT_SECOND);
+  {
+    std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, 
std::ios::binary);
+    std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, 
std::ios::binary);
 
     // Create and write to the test file
     for (int i = 0; i < 6; i++) {
       std::ofstream tmpfile;
       std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
-      tmpfile.open(flowFileName.c_str());
+      tmpfile.open(flowFileName.c_str(), std::ios::binary);
       for (int j = 0; j < 32; j++) {
         tmpfile << std::to_string(i);
         if (i < 3)
           expectfileFirst << std::to_string(i);
         else
           expectfileSecond << std::to_string(i);
       }
-      tmpfile.close();
     }
-    expectfileFirst.close();
-    expectfileSecond.close();
-
-    TestController testController;
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<core::ProcessSession>();
-    
LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
-    std::shared_ptr<core::Processor> processor = 
std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = 
std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
-    std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
-    
content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    // connection from merge processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = 
std::make_shared<minifi::Connection>(repo, content_repo, 
"logattributeconnection");
-    connection->addRelationship(core::Relationship("merged", "Merge successful 
output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
-    // connection to merge processor
-    std::shared_ptr<minifi::Connection> mergeconnection = 
std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection");
-    mergeconnection->setDestination(processor);
-    mergeconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(mergeconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship original("original", "");
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(original);
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+  }
 
-    std::shared_ptr<core::ProcessorNode> node = 
std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, 
controller_services_provider, repo, repo, content_repo);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat,
 MERGE_FORMAT_CONCAT_VALUE);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy,
 MERGE_STRATEGY_BIN_PACK);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey,
 DELIMITER_STRATEGY_TEXT);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize,
 "96");
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName,
 "tag");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::FlowFile> record[6];
-
-    // Generate 6 flowfiles, first threes merged to one, second thress merged 
to one
-    std::shared_ptr<core::Connectable> income = 
node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = 
std::static_pointer_cast<minifi::Connection>(income);
-    for (int i = 0; i < 6; i++) {
-      std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < 
core::FlowFile > (sessionGenFlowFile.create());
-      std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
-      sessionGenFlowFile.import(flowFileName, flow, true, 0);
-      flow->setAttribute("tag", "tag");
-      record[i] = flow;
-    }
-    income_connection->put(record[0]);
-    income_connection->put(record[1]);
-    income_connection->put(record[2]);
-    income_connection->put(record[3]);
-    income_connection->put(record[4]);
-    income_connection->put(record[5]);
-
-    REQUIRE(processor->getName() == "mergecontent");
-    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-    processor->onSchedule(context, factory);
-    for (int i = 0; i < 6; i++) {
-      auto session = std::make_shared<core::ProcessSession>(context);
-      processor->onTrigger(context, session);
-      session->commit();
-    }
-    // validate the merge content
-    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
-    std::shared_ptr<core::FlowFile> flow1 = 
connection->poll(expiredFlowRecords);
-    std::shared_ptr<core::FlowFile> flow2 = 
connection->poll(expiredFlowRecords);
-    REQUIRE(flow1->getSize() == 96);
-    {
-      ReadCallback callback(flow1->getSize());
-      sessionGenFlowFile.read(flow1, &callback);
-      std::ifstream file1;
-      file1.open(EXPECT_MERGE_CONTENT_FIRST, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file1)), 
std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), 
callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file1.close();
-    }
-    REQUIRE(flow2->getSize() == 96);
-    {
-      ReadCallback callback(flow2->getSize());
-      sessionGenFlowFile.read(flow2, &callback);
-      std::ifstream file2;
-      file2.open(EXPECT_MERGE_CONTENT_SECOND, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file2)), 
std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), 
callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file2.close();
-    }
-    LogTestController::getInstance().reset();
-    for (int i = 0; i < 6; i++) {
-      std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
-      unlink(flowFileName.c_str());
-    }
-    unlink(EXPECT_MERGE_CONTENT_FIRST);
-    unlink(EXPECT_MERGE_CONTENT_SECOND);
-  } catch (...) {
+  MergeTestController testController;
+  auto context = testController.context;
+  auto processor = testController.processor;
+  auto input = testController.input;
+  auto output = testController.output;
+
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat,
 MERGE_FORMAT_CONCAT_VALUE);
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy,
 MERGE_STRATEGY_BIN_PACK);
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey,
 DELIMITER_STRATEGY_TEXT);
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize,
 "96");
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName,
 "tag");
+
+  core::ProcessSession sessionGenFlowFile(context);
+  std::shared_ptr<core::FlowFile> record[6];
+
+  // Generate 6 flowfiles, first threes merged to one, second thress merged to 
one
+  for (int i = 0; i < 6; i++) {
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < 
core::FlowFile > (sessionGenFlowFile.create());
+    std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
+    sessionGenFlowFile.import(flowFileName, flow, true, 0);
+    flow->setAttribute("tag", "tag");
+    record[i] = flow;
+  }
+  input->put(record[0]);
+  input->put(record[1]);
+  input->put(record[2]);
+  input->put(record[3]);
+  input->put(record[4]);
+  input->put(record[5]);
+
+  REQUIRE(processor->getName() == "mergecontent");
+  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+  processor->onSchedule(context, factory);
+  for (int i = 0; i < 6; i++) {
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+    session->commit();
   }
+  // validate the merge content
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+  std::shared_ptr<core::FlowFile> flow2 = output->poll(expiredFlowRecords);
+  REQUIRE(flow1->getSize() == 96);
+  {
+    FixedBuffer callback(flow1->getSize());
+    sessionGenFlowFile.read(flow1, &callback);
+    std::ifstream file1(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
+    std::string contents((std::istreambuf_iterator<char>(file1)), 
std::istreambuf_iterator<char>());
+    REQUIRE(callback.to_string() == contents);
+  }
+  REQUIRE(flow2->getSize() == 96);
+  {
+    FixedBuffer callback(flow2->getSize());
+    sessionGenFlowFile.read(flow2, &callback);
+    std::ifstream file2(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
+    std::string contents((std::istreambuf_iterator<char>(file2)), 
std::istreambuf_iterator<char>());
+    REQUIRE(callback.to_string() == contents);
+  }
+  LogTestController::getInstance().reset();
 }
 
 
 TEST_CASE("MergeFileTar", "[mergefiletest4]") {
-  try {
-    std::ofstream expectfileFirst;
-    std::ofstream expectfileSecond;
-    expectfileFirst.open(EXPECT_MERGE_CONTENT_FIRST);
-    expectfileSecond.open(EXPECT_MERGE_CONTENT_SECOND);
+  {
+    std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, 
std::ios::binary);
+    std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, 
std::ios::binary);
 
     // Create and write to the test file
     for (int i = 0; i < 6; i++) {
       std::ofstream tmpfile;
       std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
-      tmpfile.open(flowFileName.c_str());
+      tmpfile.open(flowFileName.c_str(), std::ios::binary);
       for (int j = 0; j < 32; j++) {
         tmpfile << std::to_string(i);
         if (i < 3)
           expectfileFirst << std::to_string(i);
         else
           expectfileSecond << std::to_string(i);
       }
-      tmpfile.close();
     }
-    expectfileFirst.close();
-    expectfileSecond.close();
-
-    TestController testController;
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<core::ProcessSession>();
-    
LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
-    std::shared_ptr<core::Processor> processor = 
std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = 
std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
-    std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
-    
content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    // connection from merge processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = 
std::make_shared<minifi::Connection>(repo, content_repo, 
"logattributeconnection");
-    connection->addRelationship(core::Relationship("merged", "Merge successful 
output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
-    // connection to merge processor
-    std::shared_ptr<minifi::Connection> mergeconnection = 
std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection");
-    mergeconnection->setDestination(processor);
-    mergeconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(mergeconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship original("original", "");
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(original);
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+  }
 
-    std::shared_ptr<core::ProcessorNode> node = 
std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, 
controller_services_provider, repo, repo, content_repo);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat,
 MERGE_FORMAT_TAR_VALUE);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy,
 MERGE_STRATEGY_BIN_PACK);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey,
 DELIMITER_STRATEGY_TEXT);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize,
 "96");
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName,
 "tag");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::FlowFile> record[6];
-
-    // Generate 6 flowfiles, first threes merged to one, second thress merged 
to one
-    std::shared_ptr<core::Connectable> income = 
node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = 
std::static_pointer_cast<minifi::Connection>(income);
-    for (int i = 0; i < 6; i++) {
-      std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < 
core::FlowFile > (sessionGenFlowFile.create());
+  MergeTestController testController;
+  auto context = testController.context;
+  auto processor = testController.processor;
+  auto input = testController.input;
+  auto output = testController.output;
+
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat,
 MERGE_FORMAT_TAR_VALUE);
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy,
 MERGE_STRATEGY_BIN_PACK);
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey,
 DELIMITER_STRATEGY_TEXT);
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize,
 "96");
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName,
 "tag");
+
+  core::ProcessSession sessionGenFlowFile(context);
+  std::shared_ptr<core::FlowFile> record[6];
+
+  // Generate 6 flowfiles, first threes merged to one, second thress merged to 
one
+  for (int i = 0; i < 6; i++) {
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < 
core::FlowFile > (sessionGenFlowFile.create());
+    std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
+    sessionGenFlowFile.import(flowFileName, flow, true, 0);
+    flow->setAttribute("tag", "tag");
+    record[i] = flow;
+  }
+  input->put(record[0]);
+  input->put(record[1]);
+  input->put(record[2]);
+  input->put(record[3]);
+  input->put(record[4]);
+  input->put(record[5]);
+
+  REQUIRE(processor->getName() == "mergecontent");
+  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+  processor->onSchedule(context, factory);
+  for (int i = 0; i < 6; i++) {
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+    session->commit();
+  }
+  // validate the merge content
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+  std::shared_ptr<core::FlowFile> flow2 = output->poll(expiredFlowRecords);
+  REQUIRE(flow1->getSize() > 0);
+  {
+    FixedBuffer callback(flow1->getSize());
+    sessionGenFlowFile.read(flow1, &callback);
+    auto archives = read_archives(callback);
+    REQUIRE(archives.size() == 3);
+    for (int i = 0; i < 3; i++) {
       std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
-      sessionGenFlowFile.import(flowFileName, flow, true, 0);
-      flow->setAttribute("tag", "tag");
-      record[i] = flow;
-    }
-    income_connection->put(record[0]);
-    income_connection->put(record[1]);
-    income_connection->put(record[2]);
-    income_connection->put(record[3]);
-    income_connection->put(record[4]);
-    income_connection->put(record[5]);
-
-    REQUIRE(processor->getName() == "mergecontent");
-    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-    processor->onSchedule(context, factory);
-    for (int i = 0; i < 6; i++) {
-      auto session = std::make_shared<core::ProcessSession>(context);
-      processor->onTrigger(context, session);
-      session->commit();
-    }
-    // validate the merge content
-    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
-    std::shared_ptr<core::FlowFile> flow1 = 
connection->poll(expiredFlowRecords);
-    std::shared_ptr<core::FlowFile> flow2 = 
connection->poll(expiredFlowRecords);
-    REQUIRE(flow1->getSize() > 0);
-    {
-      ReadCallback callback(flow1->getSize());
-      sessionGenFlowFile.read(flow1, &callback);
-      callback.archive_read();
-      REQUIRE(callback.archive_buffer_num_ == 3);
-      for (int i = 0; i < 3; i++) {
-        std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
-        std::ifstream file1;
-        file1.open(flowFileName, std::ios::in);
-        std::string contents((std::istreambuf_iterator<char>(file1)), 
std::istreambuf_iterator<char>());
-        std::string expectContents(reinterpret_cast<char *> 
(callback.archive_buffer_[i]), callback.archive_buffer_size_[i]);
-        REQUIRE(expectContents == contents);
-        file1.close();
-      }
-    }
-    REQUIRE(flow2->getSize() > 0);
-    {
-      ReadCallback callback(flow2->getSize());
-      sessionGenFlowFile.read(flow2, &callback);
-      callback.archive_read();
-      REQUIRE(callback.archive_buffer_num_ == 3);
-      for (int i = 3; i < 6; i++) {
-        std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
-        std::ifstream file1;
-        file1.open(flowFileName, std::ios::in);
-        std::string contents((std::istreambuf_iterator<char>(file1)), 
std::istreambuf_iterator<char>());
-        std::string expectContents(reinterpret_cast<char *> 
(callback.archive_buffer_[i-3]), callback.archive_buffer_size_[i-3]);
-        REQUIRE(expectContents == contents);
-        file1.close();
-      }
+      std::ifstream file1(flowFileName, std::ios::binary);
+      std::string contents((std::istreambuf_iterator<char>(file1)), 
std::istreambuf_iterator<char>());
+      REQUIRE(archives[i].to_string() == contents);
     }
-    LogTestController::getInstance().reset();
-    for (int i = 0; i < 6; i++) {
+  }
+  REQUIRE(flow2->getSize() > 0);
+  {
+    FixedBuffer callback(flow2->getSize());
+    sessionGenFlowFile.read(flow2, &callback);
+    auto archives = read_archives(callback);
+    REQUIRE(archives.size() == 3);
+    for (int i = 3; i < 6; i++) {
       std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
-      unlink(flowFileName.c_str());
+      std::ifstream file1(flowFileName, std::ios::binary);
+      std::string contents((std::istreambuf_iterator<char>(file1)), 
std::istreambuf_iterator<char>());
+      REQUIRE(archives[i-3].to_string() == contents);
     }
-    unlink(EXPECT_MERGE_CONTENT_FIRST);
-    unlink(EXPECT_MERGE_CONTENT_SECOND);
-  } catch (...) {
   }
+  LogTestController::getInstance().reset();
 }
 
 TEST_CASE("MergeFileZip", "[mergefiletest5]") {
-  try {
-    std::ofstream expectfileFirst;
-    std::ofstream expectfileSecond;
-    expectfileFirst.open(EXPECT_MERGE_CONTENT_FIRST);
-    expectfileSecond.open(EXPECT_MERGE_CONTENT_SECOND);
+  {
+    std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, 
std::ios::binary);
+    std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, 
std::ios::binary);
 
     // Create and write to the test file
     for (int i = 0; i < 6; i++) {
       std::ofstream tmpfile;
       std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
-      tmpfile.open(flowFileName.c_str());
+      tmpfile.open(flowFileName.c_str(), std::ios::binary);
       for (int j = 0; j < 32; j++) {
         tmpfile << std::to_string(i);
         if (i < 3)
           expectfileFirst << std::to_string(i);
         else
           expectfileSecond << std::to_string(i);
       }
-      tmpfile.close();
     }
-    expectfileFirst.close();
-    expectfileSecond.close();
-
-    TestController testController;
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<core::ProcessSession>();
-    
LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
-    std::shared_ptr<core::Processor> processor = 
std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = 
std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
-    std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
-    
content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    // connection from merge processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = 
std::make_shared<minifi::Connection>(repo, content_repo, 
"logattributeconnection");
-    connection->addRelationship(core::Relationship("merged", "Merge successful 
output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
-    // connection to merge processor
-    std::shared_ptr<minifi::Connection> mergeconnection = 
std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection");
-    mergeconnection->setDestination(processor);
-    mergeconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(mergeconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship original("original", "");
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(original);
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+  }
 
-    std::shared_ptr<core::ProcessorNode> node = 
std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, 
controller_services_provider, repo, repo, content_repo);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat,
 MERGE_FORMAT_ZIP_VALUE);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy,
 MERGE_STRATEGY_BIN_PACK);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey,
 DELIMITER_STRATEGY_TEXT);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize,
 "96");
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName,
 "tag");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::FlowFile> record[6];
-
-    // Generate 6 flowfiles, first threes merged to one, second thress merged 
to one
-    std::shared_ptr<core::Connectable> income = 
node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = 
std::static_pointer_cast<minifi::Connection>(income);

Review comment:
       This part also seems to be very similar in all the testcases in this 
file.
   Is it possible to create a function to avoid copy-pasting this?
   
   The changes here look good, my goal is just to further improve the code. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to