adamdebreceni commented on a change in pull request #792:
URL: https://github.com/apache/nifi-minifi-cpp/pull/792#discussion_r429760668



##########
File path: libminifi/test/archive-tests/MergeFileTests.cpp
##########
@@ -275,785 +312,451 @@ TEST_CASE("MergeFileDefragmentDelimiter", 
"[mergefiletest2]") {
         expectfileSecond << "demarcator";
       std::ofstream tmpfile;
       std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
-      tmpfile.open(flowFileName.c_str());
+      tmpfile.open(flowFileName.c_str(), std::ios::binary);
       for (int j = 0; j < 32; j++) {
         tmpfile << std::to_string(i);
         if (i < 3)
           expectfileFirst << std::to_string(i);
         else
           expectfileSecond << std::to_string(i);
       }
-      tmpfile.close();
     }
     expectfileFirst << "footer";
     expectfileSecond << "footer";
-    expectfileFirst.close();
-    expectfileSecond.close();
-
-    TestController testController;
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<core::ProcessSession>();
-    
LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
-    std::shared_ptr<core::Processor> processor = 
std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = 
std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
-    std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
-    
content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    // connection from merge processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = 
std::make_shared<minifi::Connection>(repo, content_repo, 
"logattributeconnection");
-    connection->addRelationship(core::Relationship("merged", "Merge successful 
output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
-    // connection to merge processor
-    std::shared_ptr<minifi::Connection> mergeconnection = 
std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection");
-    mergeconnection->setDestination(processor);
-    mergeconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(mergeconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship original("original", "");
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(original);
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+  }
 
-    std::shared_ptr<core::ProcessorNode> node = 
std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, 
controller_services_provider, repo, repo, content_repo);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat,
 MERGE_FORMAT_CONCAT_VALUE);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy,
 MERGE_STRATEGY_DEFRAGMENT);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey,
 DELIMITER_STRATEGY_FILENAME);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Header,
 "/tmp/minifi-mergecontent.header");
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Footer,
 "/tmp/minifi-mergecontent.footer");
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Demarcator,
 "/tmp/minifi-mergecontent.demarcator");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::FlowFile> record[6];
-
-    // Generate 6 flowfiles, first threes merged to one, second thress merged 
to one
-    std::shared_ptr<core::Connectable> income = 
node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = 
std::static_pointer_cast<minifi::Connection>(income);
-    for (int i = 0; i < 6; i++) {
-      std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < 
core::FlowFile > (sessionGenFlowFile.create());
-      std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
-      sessionGenFlowFile.import(flowFileName, flow, true, 0);
-      // three bundle
-      if (i < 3)
-        flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, 
std::to_string(0));
-      else
-        flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, 
std::to_string(1));
-      if (i < 3)
-        flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, 
std::to_string(i));
-      else
-        flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, 
std::to_string(i-3));
-      flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, 
std::to_string(3));
-      record[i] = flow;
-    }
-    income_connection->put(record[0]);
-    income_connection->put(record[2]);
-    income_connection->put(record[5]);
-    income_connection->put(record[4]);
-    income_connection->put(record[1]);
-    income_connection->put(record[3]);
-
-    REQUIRE(processor->getName() == "mergecontent");
-    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-    processor->onSchedule(context, factory);
-    for (int i = 0; i < 6; i++) {
-      auto session = std::make_shared<core::ProcessSession>(context);
-      processor->onTrigger(context, session);
-      session->commit();
-    }
-    // validate the merge content
-    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
-    std::shared_ptr<core::FlowFile> flow1 = 
connection->poll(expiredFlowRecords);
-    std::shared_ptr<core::FlowFile> flow2 = 
connection->poll(expiredFlowRecords);
-    REQUIRE(flow1->getSize() == 128);
-    {
-      ReadCallback callback(flow1->getSize());
-      sessionGenFlowFile.read(flow1, &callback);
-      std::ifstream file1;
-      file1.open(EXPECT_MERGE_CONTENT_FIRST, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file1)), 
std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), 
callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file1.close();
-    }
-    REQUIRE(flow2->getSize() == 128);
-    {
-      ReadCallback callback(flow2->getSize());
-      sessionGenFlowFile.read(flow2, &callback);
-      std::ifstream file2;
-      file2.open(EXPECT_MERGE_CONTENT_SECOND, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file2)), 
std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), 
callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file2.close();
-    }
-    LogTestController::getInstance().reset();
-    for (int i = 0; i < 6; i++) {
-      std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
-      unlink(flowFileName.c_str());
-    }
-    unlink(EXPECT_MERGE_CONTENT_FIRST);
-    unlink(EXPECT_MERGE_CONTENT_SECOND);
-    unlink(FOOTER_FILE);
-    unlink(HEADER_FILE);
-    unlink(DEMARCATOR_FILE);
-  } catch (...) {
+  MergeTestController testController;
+  auto context = testController.context;
+  auto processor = testController.processor;
+  auto input = testController.input;
+  auto output = testController.output;
+
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat,
 MERGE_FORMAT_CONCAT_VALUE);
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy,
 MERGE_STRATEGY_DEFRAGMENT);
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey,
 DELIMITER_STRATEGY_FILENAME);
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Header,
 HEADER_FILE);
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Footer,
 FOOTER_FILE);
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Demarcator,
 DEMARCATOR_FILE);
+
+  core::ProcessSession sessionGenFlowFile(context);
+  std::shared_ptr<core::FlowFile> record[6];
+
+  // Generate 6 flowfiles, first threes merged to one, second thress merged to 
one
+  for (int i = 0; i < 6; i++) {
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < 
core::FlowFile > (sessionGenFlowFile.create());
+    std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
+    sessionGenFlowFile.import(flowFileName, flow, true, 0);
+    // three bundle
+    if (i < 3)
+      flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, 
std::to_string(0));
+    else
+      flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, 
std::to_string(1));
+    if (i < 3)
+      flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, 
std::to_string(i));
+    else
+      flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, 
std::to_string(i-3));
+    flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, 
std::to_string(3));
+    record[i] = flow;
+  }
+  input->put(record[0]);
+  input->put(record[2]);
+  input->put(record[5]);
+  input->put(record[4]);
+  input->put(record[1]);
+  input->put(record[3]);
+
+  REQUIRE(processor->getName() == "mergecontent");
+  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+  processor->onSchedule(context, factory);
+  for (int i = 0; i < 6; i++) {
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+    session->commit();
+  }
+  // validate the merge content
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+  std::shared_ptr<core::FlowFile> flow2 = output->poll(expiredFlowRecords);
+  REQUIRE(flow1->getSize() == 128);
+  {
+    FixedBuffer callback(flow1->getSize());
+    sessionGenFlowFile.read(flow1, &callback);
+    std::ifstream file1(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
+    std::string contents((std::istreambuf_iterator<char>(file1)), 
std::istreambuf_iterator<char>());
+    REQUIRE(callback.to_string() == contents);
   }
+  REQUIRE(flow2->getSize() == 128);
+  {
+    FixedBuffer callback(flow2->getSize());
+    sessionGenFlowFile.read(flow2, &callback);
+    std::ifstream file2(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
+    std::string contents((std::istreambuf_iterator<char>(file2)), 
std::istreambuf_iterator<char>());
+    REQUIRE(callback.to_string() == contents);
+  }
+  LogTestController::getInstance().reset();
 }
 
 TEST_CASE("MergeFileDefragmentDropFlow", "[mergefiletest3]") {
-  try {
-    std::ofstream expectfileFirst;
-    std::ofstream expectfileSecond;
-
-    expectfileFirst.open(EXPECT_MERGE_CONTENT_FIRST);
-    expectfileSecond.open(EXPECT_MERGE_CONTENT_SECOND);
+  {
+    std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, 
std::ios::binary);
+    std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, 
std::ios::binary);
 
     // Create and write to the test file, drop record 4
     for (int i = 0; i < 6; i++) {
       if (i == 4)
         continue;
       std::ofstream tmpfile;
       std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
-      tmpfile.open(flowFileName.c_str());
+      tmpfile.open(flowFileName.c_str(), std::ios::binary);
       for (int j = 0; j < 32; j++) {
         tmpfile << std::to_string(i);
         if (i < 3)
           expectfileFirst << std::to_string(i);
         else
           expectfileSecond << std::to_string(i);
       }
-      tmpfile.close();
     }
-    expectfileFirst.close();
-    expectfileSecond.close();
-
-    TestController testController;
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<core::ProcessSession>();
-    
LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
-    std::shared_ptr<core::Processor> processor = 
std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = 
std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
-    std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
-    
content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    // connection from merge processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = 
std::make_shared<minifi::Connection>(repo, content_repo, 
"logattributeconnection");
-    connection->addRelationship(core::Relationship("merged", "Merge successful 
output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
-    // connection to merge processor
-    std::shared_ptr<minifi::Connection> mergeconnection = 
std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection");
-    mergeconnection->setDestination(processor);
-    mergeconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(mergeconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship original("original", "");
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(original);
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
-
-    std::shared_ptr<core::ProcessorNode> node = 
std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, 
controller_services_provider, repo, repo, content_repo);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat,
 MERGE_FORMAT_CONCAT_VALUE);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy,
 MERGE_STRATEGY_DEFRAGMENT);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey,
 DELIMITER_STRATEGY_TEXT);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MaxBinAge,
 "1 sec");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::FlowFile> record[6];
+  }
 
-    // Generate 6 flowfiles, first threes merged to one, second thress merged 
to one
-    std::shared_ptr<core::Connectable> income = 
node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = 
std::static_pointer_cast<minifi::Connection>(income);
-    for (int i = 0; i < 6; i++) {
-      if (i == 4)
-        continue;
-      std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < 
core::FlowFile > (sessionGenFlowFile.create());
-      std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
-      sessionGenFlowFile.import(flowFileName, flow, true, 0);
-      // three bundle
-      if (i < 3)
-        flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, 
std::to_string(0));
-      else
-        flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, 
std::to_string(1));
-      if (i < 3)
-        flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, 
std::to_string(i));
-      else
-        flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, 
std::to_string(i-3));
-      flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, 
std::to_string(3));
-      record[i] = flow;
-    }
-    income_connection->put(record[0]);
-    income_connection->put(record[2]);
-    income_connection->put(record[5]);
-    income_connection->put(record[1]);
-    income_connection->put(record[3]);
-
-    REQUIRE(processor->getName() == "mergecontent");
-    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-    processor->onSchedule(context, factory);
-    for (int i = 0; i < 6; i++) {
-      if (i == 4)
-        continue;
-      auto session = std::make_shared<core::ProcessSession>(context);
-      processor->onTrigger(context, session);
-      session->commit();
-    }
-    std::this_thread::sleep_for(std::chrono::milliseconds(2000));
-    {
-      auto session = std::make_shared<core::ProcessSession>(context);
-      processor->onTrigger(context, session);
-    }
-    // validate the merge content
-    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
-    std::shared_ptr<core::FlowFile> flow1 = 
connection->poll(expiredFlowRecords);
-    std::shared_ptr<core::FlowFile> flow2 = 
connection->poll(expiredFlowRecords);
-    REQUIRE(flow1->getSize() == 96);
-    {
-      ReadCallback callback(flow1->getSize());
-      sessionGenFlowFile.read(flow1, &callback);
-      std::ifstream file1;
-      file1.open(EXPECT_MERGE_CONTENT_FIRST, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file1)), 
std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), 
callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file1.close();
-    }
-    REQUIRE(flow2->getSize() == 64);
-    {
-      ReadCallback callback(flow2->getSize());
-      sessionGenFlowFile.read(flow2, &callback);
-      std::ifstream file2;
-      file2.open(EXPECT_MERGE_CONTENT_SECOND, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file2)), 
std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), 
callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file2.close();
-    }
-    LogTestController::getInstance().reset();
-    for (int i = 0; i < 6; i++) {
-      if (i == 4)
-        continue;
-      std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
-      unlink(flowFileName.c_str());
-    }
-    unlink(EXPECT_MERGE_CONTENT_FIRST);
-    unlink(EXPECT_MERGE_CONTENT_SECOND);
-  } catch (...) {
+  MergeTestController testController;
+  auto context = testController.context;
+  auto processor = testController.processor;
+  auto input = testController.input;
+  auto output = testController.output;
+
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat,
 MERGE_FORMAT_CONCAT_VALUE);
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy,
 MERGE_STRATEGY_DEFRAGMENT);
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey,
 DELIMITER_STRATEGY_TEXT);
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MaxBinAge,
 "1 sec");
+
+  core::ProcessSession sessionGenFlowFile(context);
+  std::shared_ptr<core::FlowFile> record[6];
+
+  // Generate 6 flowfiles, first threes merged to one, second thress merged to 
one
+  for (int i = 0; i < 6; i++) {
+    if (i == 4)
+      continue;
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < 
core::FlowFile > (sessionGenFlowFile.create());
+    std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
+    sessionGenFlowFile.import(flowFileName, flow, true, 0);
+    // three bundle
+    if (i < 3)
+      flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, 
std::to_string(0));
+    else
+      flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, 
std::to_string(1));
+    if (i < 3)
+      flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, 
std::to_string(i));
+    else
+      flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, 
std::to_string(i-3));
+    flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, 
std::to_string(3));
+    record[i] = flow;
+  }
+  input->put(record[0]);
+  input->put(record[2]);
+  input->put(record[5]);
+  input->put(record[1]);
+  input->put(record[3]);
+
+  REQUIRE(processor->getName() == "mergecontent");
+  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+  processor->onSchedule(context, factory);
+  for (int i = 0; i < 6; i++) {
+    if (i == 4)
+      continue;
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+    session->commit();
   }
+  std::this_thread::sleep_for(std::chrono::milliseconds(2000));
+  {
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+  }
+  // validate the merge content
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+  std::shared_ptr<core::FlowFile> flow2 = output->poll(expiredFlowRecords);
+  REQUIRE(flow1->getSize() == 96);
+  {
+    FixedBuffer callback(flow1->getSize());
+    sessionGenFlowFile.read(flow1, &callback);
+    std::ifstream file1(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
+    std::string contents((std::istreambuf_iterator<char>(file1)), 
std::istreambuf_iterator<char>());
+    REQUIRE(callback.to_string() == contents);
+  }
+  REQUIRE(flow2->getSize() == 64);
+  {
+    FixedBuffer callback(flow2->getSize());
+    sessionGenFlowFile.read(flow2, &callback);
+    std::ifstream file2(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
+    std::string contents((std::istreambuf_iterator<char>(file2)), 
std::istreambuf_iterator<char>());
+    REQUIRE(callback.to_string() == contents);
+  }
+  LogTestController::getInstance().reset();
 }
 
 TEST_CASE("MergeFileBinPack", "[mergefiletest4]") {
-  try {
-    std::ofstream expectfileFirst;
-    std::ofstream expectfileSecond;
-    expectfileFirst.open(EXPECT_MERGE_CONTENT_FIRST);
-    expectfileSecond.open(EXPECT_MERGE_CONTENT_SECOND);
+  {
+    std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, 
std::ios::binary);
+    std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, 
std::ios::binary);
 
     // Create and write to the test file
     for (int i = 0; i < 6; i++) {
       std::ofstream tmpfile;
       std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
-      tmpfile.open(flowFileName.c_str());
+      tmpfile.open(flowFileName.c_str(), std::ios::binary);
       for (int j = 0; j < 32; j++) {
         tmpfile << std::to_string(i);
         if (i < 3)
           expectfileFirst << std::to_string(i);
         else
           expectfileSecond << std::to_string(i);
       }
-      tmpfile.close();
     }
-    expectfileFirst.close();
-    expectfileSecond.close();
-
-    TestController testController;
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<core::ProcessSession>();
-    
LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
-    std::shared_ptr<core::Processor> processor = 
std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = 
std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
-    std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
-    
content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    // connection from merge processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = 
std::make_shared<minifi::Connection>(repo, content_repo, 
"logattributeconnection");
-    connection->addRelationship(core::Relationship("merged", "Merge successful 
output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
-    // connection to merge processor
-    std::shared_ptr<minifi::Connection> mergeconnection = 
std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection");
-    mergeconnection->setDestination(processor);
-    mergeconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(mergeconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship original("original", "");
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(original);
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+  }
 
-    std::shared_ptr<core::ProcessorNode> node = 
std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, 
controller_services_provider, repo, repo, content_repo);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat,
 MERGE_FORMAT_CONCAT_VALUE);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy,
 MERGE_STRATEGY_BIN_PACK);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey,
 DELIMITER_STRATEGY_TEXT);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize,
 "96");
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName,
 "tag");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::FlowFile> record[6];
-
-    // Generate 6 flowfiles, first threes merged to one, second thress merged 
to one
-    std::shared_ptr<core::Connectable> income = 
node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = 
std::static_pointer_cast<minifi::Connection>(income);
-    for (int i = 0; i < 6; i++) {
-      std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < 
core::FlowFile > (sessionGenFlowFile.create());
-      std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
-      sessionGenFlowFile.import(flowFileName, flow, true, 0);
-      flow->setAttribute("tag", "tag");
-      record[i] = flow;
-    }
-    income_connection->put(record[0]);
-    income_connection->put(record[1]);
-    income_connection->put(record[2]);
-    income_connection->put(record[3]);
-    income_connection->put(record[4]);
-    income_connection->put(record[5]);
-
-    REQUIRE(processor->getName() == "mergecontent");
-    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-    processor->onSchedule(context, factory);
-    for (int i = 0; i < 6; i++) {
-      auto session = std::make_shared<core::ProcessSession>(context);
-      processor->onTrigger(context, session);
-      session->commit();
-    }
-    // validate the merge content
-    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
-    std::shared_ptr<core::FlowFile> flow1 = 
connection->poll(expiredFlowRecords);
-    std::shared_ptr<core::FlowFile> flow2 = 
connection->poll(expiredFlowRecords);
-    REQUIRE(flow1->getSize() == 96);
-    {
-      ReadCallback callback(flow1->getSize());
-      sessionGenFlowFile.read(flow1, &callback);
-      std::ifstream file1;
-      file1.open(EXPECT_MERGE_CONTENT_FIRST, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file1)), 
std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), 
callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file1.close();
-    }
-    REQUIRE(flow2->getSize() == 96);
-    {
-      ReadCallback callback(flow2->getSize());
-      sessionGenFlowFile.read(flow2, &callback);
-      std::ifstream file2;
-      file2.open(EXPECT_MERGE_CONTENT_SECOND, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file2)), 
std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), 
callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file2.close();
-    }
-    LogTestController::getInstance().reset();
-    for (int i = 0; i < 6; i++) {
-      std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
-      unlink(flowFileName.c_str());
-    }
-    unlink(EXPECT_MERGE_CONTENT_FIRST);
-    unlink(EXPECT_MERGE_CONTENT_SECOND);
-  } catch (...) {
+  MergeTestController testController;
+  auto context = testController.context;
+  auto processor = testController.processor;
+  auto input = testController.input;
+  auto output = testController.output;
+
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat,
 MERGE_FORMAT_CONCAT_VALUE);
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy,
 MERGE_STRATEGY_BIN_PACK);
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey,
 DELIMITER_STRATEGY_TEXT);
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize,
 "96");
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName,
 "tag");
+
+  core::ProcessSession sessionGenFlowFile(context);
+  std::shared_ptr<core::FlowFile> record[6];
+
+  // Generate 6 flowfiles, first threes merged to one, second thress merged to 
one
+  for (int i = 0; i < 6; i++) {
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < 
core::FlowFile > (sessionGenFlowFile.create());
+    std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
+    sessionGenFlowFile.import(flowFileName, flow, true, 0);
+    flow->setAttribute("tag", "tag");
+    record[i] = flow;
+  }
+  input->put(record[0]);
+  input->put(record[1]);
+  input->put(record[2]);
+  input->put(record[3]);
+  input->put(record[4]);
+  input->put(record[5]);
+
+  REQUIRE(processor->getName() == "mergecontent");
+  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+  processor->onSchedule(context, factory);
+  for (int i = 0; i < 6; i++) {
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+    session->commit();
   }
+  // validate the merge content
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+  std::shared_ptr<core::FlowFile> flow2 = output->poll(expiredFlowRecords);
+  REQUIRE(flow1->getSize() == 96);
+  {
+    FixedBuffer callback(flow1->getSize());
+    sessionGenFlowFile.read(flow1, &callback);
+    std::ifstream file1(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
+    std::string contents((std::istreambuf_iterator<char>(file1)), 
std::istreambuf_iterator<char>());
+    REQUIRE(callback.to_string() == contents);
+  }
+  REQUIRE(flow2->getSize() == 96);
+  {
+    FixedBuffer callback(flow2->getSize());
+    sessionGenFlowFile.read(flow2, &callback);
+    std::ifstream file2(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
+    std::string contents((std::istreambuf_iterator<char>(file2)), 
std::istreambuf_iterator<char>());
+    REQUIRE(callback.to_string() == contents);
+  }
+  LogTestController::getInstance().reset();
 }
 
 
 TEST_CASE("MergeFileTar", "[mergefiletest4]") {
-  try {
-    std::ofstream expectfileFirst;
-    std::ofstream expectfileSecond;
-    expectfileFirst.open(EXPECT_MERGE_CONTENT_FIRST);
-    expectfileSecond.open(EXPECT_MERGE_CONTENT_SECOND);
+  {
+    std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, 
std::ios::binary);
+    std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, 
std::ios::binary);
 
     // Create and write to the test file
     for (int i = 0; i < 6; i++) {
       std::ofstream tmpfile;
       std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
-      tmpfile.open(flowFileName.c_str());
+      tmpfile.open(flowFileName.c_str(), std::ios::binary);
       for (int j = 0; j < 32; j++) {
         tmpfile << std::to_string(i);
         if (i < 3)
           expectfileFirst << std::to_string(i);
         else
           expectfileSecond << std::to_string(i);
       }
-      tmpfile.close();
     }
-    expectfileFirst.close();
-    expectfileSecond.close();
-
-    TestController testController;
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<core::ProcessSession>();
-    
LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
-    std::shared_ptr<core::Processor> processor = 
std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = 
std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
-    std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
-    
content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    // connection from merge processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = 
std::make_shared<minifi::Connection>(repo, content_repo, 
"logattributeconnection");
-    connection->addRelationship(core::Relationship("merged", "Merge successful 
output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
-    // connection to merge processor
-    std::shared_ptr<minifi::Connection> mergeconnection = 
std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection");
-    mergeconnection->setDestination(processor);
-    mergeconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(mergeconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship original("original", "");
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(original);
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+  }
 
-    std::shared_ptr<core::ProcessorNode> node = 
std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, 
controller_services_provider, repo, repo, content_repo);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat,
 MERGE_FORMAT_TAR_VALUE);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy,
 MERGE_STRATEGY_BIN_PACK);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey,
 DELIMITER_STRATEGY_TEXT);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize,
 "96");
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName,
 "tag");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::FlowFile> record[6];
-
-    // Generate 6 flowfiles, first threes merged to one, second thress merged 
to one
-    std::shared_ptr<core::Connectable> income = 
node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = 
std::static_pointer_cast<minifi::Connection>(income);
-    for (int i = 0; i < 6; i++) {
-      std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < 
core::FlowFile > (sessionGenFlowFile.create());
+  MergeTestController testController;
+  auto context = testController.context;
+  auto processor = testController.processor;
+  auto input = testController.input;
+  auto output = testController.output;
+
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat,
 MERGE_FORMAT_TAR_VALUE);
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy,
 MERGE_STRATEGY_BIN_PACK);
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey,
 DELIMITER_STRATEGY_TEXT);
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize,
 "96");
+  
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName,
 "tag");
+
+  core::ProcessSession sessionGenFlowFile(context);
+  std::shared_ptr<core::FlowFile> record[6];
+
+  // Generate 6 flowfiles, first threes merged to one, second thress merged to 
one
+  for (int i = 0; i < 6; i++) {
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < 
core::FlowFile > (sessionGenFlowFile.create());
+    std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
+    sessionGenFlowFile.import(flowFileName, flow, true, 0);
+    flow->setAttribute("tag", "tag");
+    record[i] = flow;
+  }
+  input->put(record[0]);
+  input->put(record[1]);
+  input->put(record[2]);
+  input->put(record[3]);
+  input->put(record[4]);
+  input->put(record[5]);
+
+  REQUIRE(processor->getName() == "mergecontent");
+  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+  processor->onSchedule(context, factory);
+  for (int i = 0; i < 6; i++) {
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+    session->commit();
+  }
+  // validate the merge content
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+  std::shared_ptr<core::FlowFile> flow2 = output->poll(expiredFlowRecords);
+  REQUIRE(flow1->getSize() > 0);
+  {
+    FixedBuffer callback(flow1->getSize());
+    sessionGenFlowFile.read(flow1, &callback);
+    auto archives = read_archives(callback);
+    REQUIRE(archives.size() == 3);
+    for (int i = 0; i < 3; i++) {
       std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
-      sessionGenFlowFile.import(flowFileName, flow, true, 0);
-      flow->setAttribute("tag", "tag");
-      record[i] = flow;
-    }
-    income_connection->put(record[0]);
-    income_connection->put(record[1]);
-    income_connection->put(record[2]);
-    income_connection->put(record[3]);
-    income_connection->put(record[4]);
-    income_connection->put(record[5]);
-
-    REQUIRE(processor->getName() == "mergecontent");
-    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-    processor->onSchedule(context, factory);
-    for (int i = 0; i < 6; i++) {
-      auto session = std::make_shared<core::ProcessSession>(context);
-      processor->onTrigger(context, session);
-      session->commit();
-    }
-    // validate the merge content
-    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
-    std::shared_ptr<core::FlowFile> flow1 = 
connection->poll(expiredFlowRecords);
-    std::shared_ptr<core::FlowFile> flow2 = 
connection->poll(expiredFlowRecords);
-    REQUIRE(flow1->getSize() > 0);
-    {
-      ReadCallback callback(flow1->getSize());
-      sessionGenFlowFile.read(flow1, &callback);
-      callback.archive_read();
-      REQUIRE(callback.archive_buffer_num_ == 3);
-      for (int i = 0; i < 3; i++) {
-        std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
-        std::ifstream file1;
-        file1.open(flowFileName, std::ios::in);
-        std::string contents((std::istreambuf_iterator<char>(file1)), 
std::istreambuf_iterator<char>());
-        std::string expectContents(reinterpret_cast<char *> 
(callback.archive_buffer_[i]), callback.archive_buffer_size_[i]);
-        REQUIRE(expectContents == contents);
-        file1.close();
-      }
-    }
-    REQUIRE(flow2->getSize() > 0);
-    {
-      ReadCallback callback(flow2->getSize());
-      sessionGenFlowFile.read(flow2, &callback);
-      callback.archive_read();
-      REQUIRE(callback.archive_buffer_num_ == 3);
-      for (int i = 3; i < 6; i++) {
-        std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
-        std::ifstream file1;
-        file1.open(flowFileName, std::ios::in);
-        std::string contents((std::istreambuf_iterator<char>(file1)), 
std::istreambuf_iterator<char>());
-        std::string expectContents(reinterpret_cast<char *> 
(callback.archive_buffer_[i-3]), callback.archive_buffer_size_[i-3]);
-        REQUIRE(expectContents == contents);
-        file1.close();
-      }
+      std::ifstream file1(flowFileName, std::ios::binary);
+      std::string contents((std::istreambuf_iterator<char>(file1)), 
std::istreambuf_iterator<char>());
+      REQUIRE(archives[i].to_string() == contents);
     }
-    LogTestController::getInstance().reset();
-    for (int i = 0; i < 6; i++) {
+  }
+  REQUIRE(flow2->getSize() > 0);
+  {
+    FixedBuffer callback(flow2->getSize());
+    sessionGenFlowFile.read(flow2, &callback);
+    auto archives = read_archives(callback);
+    REQUIRE(archives.size() == 3);
+    for (int i = 3; i < 6; i++) {
       std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
-      unlink(flowFileName.c_str());
+      std::ifstream file1(flowFileName, std::ios::binary);
+      std::string contents((std::istreambuf_iterator<char>(file1)), 
std::istreambuf_iterator<char>());
+      REQUIRE(archives[i-3].to_string() == contents);
     }
-    unlink(EXPECT_MERGE_CONTENT_FIRST);
-    unlink(EXPECT_MERGE_CONTENT_SECOND);
-  } catch (...) {
   }
+  LogTestController::getInstance().reset();
 }
 
 TEST_CASE("MergeFileZip", "[mergefiletest5]") {
-  try {
-    std::ofstream expectfileFirst;
-    std::ofstream expectfileSecond;
-    expectfileFirst.open(EXPECT_MERGE_CONTENT_FIRST);
-    expectfileSecond.open(EXPECT_MERGE_CONTENT_SECOND);
+  {
+    std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, 
std::ios::binary);
+    std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, 
std::ios::binary);
 
     // Create and write to the test file
     for (int i = 0; i < 6; i++) {
       std::ofstream tmpfile;
       std::string flowFileName = std::string(FLOW_FILE) + "." + 
std::to_string(i) + ".txt";
-      tmpfile.open(flowFileName.c_str());
+      tmpfile.open(flowFileName.c_str(), std::ios::binary);
       for (int j = 0; j < 32; j++) {
         tmpfile << std::to_string(i);
         if (i < 3)
           expectfileFirst << std::to_string(i);
         else
           expectfileSecond << std::to_string(i);
       }
-      tmpfile.close();
     }
-    expectfileFirst.close();
-    expectfileSecond.close();
-
-    TestController testController;
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<core::ProcessSession>();
-    
LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
-    std::shared_ptr<core::Processor> processor = 
std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = 
std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
-    std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
-    
content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    // connection from merge processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = 
std::make_shared<minifi::Connection>(repo, content_repo, 
"logattributeconnection");
-    connection->addRelationship(core::Relationship("merged", "Merge successful 
output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
-    // connection to merge processor
-    std::shared_ptr<minifi::Connection> mergeconnection = 
std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection");
-    mergeconnection->setDestination(processor);
-    mergeconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(mergeconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship original("original", "");
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(original);
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+  }
 
-    std::shared_ptr<core::ProcessorNode> node = 
std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, 
controller_services_provider, repo, repo, content_repo);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat,
 MERGE_FORMAT_ZIP_VALUE);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy,
 MERGE_STRATEGY_BIN_PACK);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey,
 DELIMITER_STRATEGY_TEXT);
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize,
 "96");
-    
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName,
 "tag");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::FlowFile> record[6];
-
-    // Generate 6 flowfiles, first threes merged to one, second thress merged 
to one
-    std::shared_ptr<core::Connectable> income = 
node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = 
std::static_pointer_cast<minifi::Connection>(income);

Review comment:
       it seems to me, that I have already moved this one




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to