adamdebreceni commented on a change in pull request #792: URL: https://github.com/apache/nifi-minifi-cpp/pull/792#discussion_r429760668
########## File path: libminifi/test/archive-tests/MergeFileTests.cpp ########## @@ -275,785 +312,451 @@ TEST_CASE("MergeFileDefragmentDelimiter", "[mergefiletest2]") { expectfileSecond << "demarcator"; std::ofstream tmpfile; std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt"; - tmpfile.open(flowFileName.c_str()); + tmpfile.open(flowFileName.c_str(), std::ios::binary); for (int j = 0; j < 32; j++) { tmpfile << std::to_string(i); if (i < 3) expectfileFirst << std::to_string(i); else expectfileSecond << std::to_string(i); } - tmpfile.close(); } expectfileFirst << "footer"; expectfileSecond << "footer"; - expectfileFirst.close(); - expectfileSecond.close(); - - TestController testController; - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>(); - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>(); - LogTestController::getInstance().setTrace<core::ProcessSession>(); - LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>(); - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>(); - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>(); - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>(); - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>(); - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>(); - - std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); - - std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent"); - std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); - processor->initialize(); - utils::Identifier processoruuid; - REQUIRE(true == processor->getUUID(processoruuid)); - utils::Identifier logAttributeuuid; - REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid)); - - std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>()); - // connection from merge processor to log attribute - std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection"); - connection->addRelationship(core::Relationship("merged", "Merge successful output")); - connection->setSource(processor); - connection->setDestination(logAttributeProcessor); - connection->setSourceUUID(processoruuid); - connection->setDestinationUUID(logAttributeuuid); - processor->addConnection(connection); - // connection to merge processor - std::shared_ptr<minifi::Connection> mergeconnection = std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection"); - mergeconnection->setDestination(processor); - mergeconnection->setDestinationUUID(processoruuid); - processor->addConnection(mergeconnection); - - std::set<core::Relationship> autoTerminatedRelationships; - core::Relationship original("original", ""); - core::Relationship failure("failure", ""); - autoTerminatedRelationships.insert(original); - autoTerminatedRelationships.insert(failure); - processor->setAutoTerminatedRelationships(autoTerminatedRelationships); - - processor->incrementActiveTasks(); - processor->setScheduledState(core::ScheduledState::RUNNING); - logAttributeProcessor->incrementActiveTasks(); - logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING); + } - std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor); - std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; - auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo); - context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE); - context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_DEFRAGMENT); - context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_FILENAME); - context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Header, "/tmp/minifi-mergecontent.header"); - context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Footer, "/tmp/minifi-mergecontent.footer"); - context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Demarcator, "/tmp/minifi-mergecontent.demarcator"); - - core::ProcessSession sessionGenFlowFile(context); - std::shared_ptr<core::FlowFile> record[6]; - - // Generate 6 flowfiles, first threes merged to one, second thress merged to one - std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection(); - std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income); - for (int i = 0; i < 6; i++) { - std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create()); - std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt"; - sessionGenFlowFile.import(flowFileName, flow, true, 0); - // three bundle - if (i < 3) - flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0)); - else - flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(1)); - if (i < 3) - flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i)); - else - flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i-3)); - flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3)); - record[i] = flow; - } - income_connection->put(record[0]); - income_connection->put(record[2]); - income_connection->put(record[5]); - income_connection->put(record[4]); - income_connection->put(record[1]); - income_connection->put(record[3]); - - REQUIRE(processor->getName() == "mergecontent"); - auto factory = std::make_shared<core::ProcessSessionFactory>(context); - processor->onSchedule(context, factory); - for (int i = 0; i < 6; i++) { - auto session = std::make_shared<core::ProcessSession>(context); - processor->onTrigger(context, session); - session->commit(); - } - // validate the merge content - std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords; - std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords); - std::shared_ptr<core::FlowFile> flow2 = connection->poll(expiredFlowRecords); - REQUIRE(flow1->getSize() == 128); - { - ReadCallback callback(flow1->getSize()); - sessionGenFlowFile.read(flow1, &callback); - std::ifstream file1; - file1.open(EXPECT_MERGE_CONTENT_FIRST, std::ios::in); - std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>()); - std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_); - REQUIRE(expectContents == contents); - file1.close(); - } - REQUIRE(flow2->getSize() == 128); - { - ReadCallback callback(flow2->getSize()); - sessionGenFlowFile.read(flow2, &callback); - std::ifstream file2; - file2.open(EXPECT_MERGE_CONTENT_SECOND, std::ios::in); - std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>()); - std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_); - REQUIRE(expectContents == contents); - file2.close(); - } - LogTestController::getInstance().reset(); - for (int i = 0; i < 6; i++) { - std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt"; - unlink(flowFileName.c_str()); - } - unlink(EXPECT_MERGE_CONTENT_FIRST); - unlink(EXPECT_MERGE_CONTENT_SECOND); - unlink(FOOTER_FILE); - unlink(HEADER_FILE); - unlink(DEMARCATOR_FILE); - } catch (...) { + MergeTestController testController; + auto context = testController.context; + auto processor = testController.processor; + auto input = testController.input; + auto output = testController.output; + + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_DEFRAGMENT); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_FILENAME); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Header, HEADER_FILE); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Footer, FOOTER_FILE); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Demarcator, DEMARCATOR_FILE); + + core::ProcessSession sessionGenFlowFile(context); + std::shared_ptr<core::FlowFile> record[6]; + + // Generate 6 flowfiles, first threes merged to one, second thress merged to one + for (int i = 0; i < 6; i++) { + std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create()); + std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt"; + sessionGenFlowFile.import(flowFileName, flow, true, 0); + // three bundle + if (i < 3) + flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0)); + else + flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(1)); + if (i < 3) + flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i)); + else + flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i-3)); + flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3)); + record[i] = flow; + } + input->put(record[0]); + input->put(record[2]); + input->put(record[5]); + input->put(record[4]); + input->put(record[1]); + input->put(record[3]); + + REQUIRE(processor->getName() == "mergecontent"); + auto factory = std::make_shared<core::ProcessSessionFactory>(context); + processor->onSchedule(context, factory); + for (int i = 0; i < 6; i++) { + auto session = std::make_shared<core::ProcessSession>(context); + processor->onTrigger(context, session); + session->commit(); + } + // validate the merge content + std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords; + std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords); + std::shared_ptr<core::FlowFile> flow2 = output->poll(expiredFlowRecords); + REQUIRE(flow1->getSize() == 128); + { + FixedBuffer callback(flow1->getSize()); + sessionGenFlowFile.read(flow1, &callback); + std::ifstream file1(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary); + std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>()); + REQUIRE(callback.to_string() == contents); } + REQUIRE(flow2->getSize() == 128); + { + FixedBuffer callback(flow2->getSize()); + sessionGenFlowFile.read(flow2, &callback); + std::ifstream file2(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary); + std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>()); + REQUIRE(callback.to_string() == contents); + } + LogTestController::getInstance().reset(); } TEST_CASE("MergeFileDefragmentDropFlow", "[mergefiletest3]") { - try { - std::ofstream expectfileFirst; - std::ofstream expectfileSecond; - - expectfileFirst.open(EXPECT_MERGE_CONTENT_FIRST); - expectfileSecond.open(EXPECT_MERGE_CONTENT_SECOND); + { + std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary); + std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary); // Create and write to the test file, drop record 4 for (int i = 0; i < 6; i++) { if (i == 4) continue; std::ofstream tmpfile; std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt"; - tmpfile.open(flowFileName.c_str()); + tmpfile.open(flowFileName.c_str(), std::ios::binary); for (int j = 0; j < 32; j++) { tmpfile << std::to_string(i); if (i < 3) expectfileFirst << std::to_string(i); else expectfileSecond << std::to_string(i); } - tmpfile.close(); } - expectfileFirst.close(); - expectfileSecond.close(); - - TestController testController; - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>(); - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>(); - LogTestController::getInstance().setTrace<core::ProcessSession>(); - LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>(); - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>(); - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>(); - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>(); - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>(); - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>(); - - std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); - - std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent"); - std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); - processor->initialize(); - utils::Identifier processoruuid; - REQUIRE(true == processor->getUUID(processoruuid)); - utils::Identifier logAttributeuuid; - REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid)); - - std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>()); - // connection from merge processor to log attribute - std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection"); - connection->addRelationship(core::Relationship("merged", "Merge successful output")); - connection->setSource(processor); - connection->setDestination(logAttributeProcessor); - connection->setSourceUUID(processoruuid); - connection->setDestinationUUID(logAttributeuuid); - processor->addConnection(connection); - // connection to merge processor - std::shared_ptr<minifi::Connection> mergeconnection = std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection"); - mergeconnection->setDestination(processor); - mergeconnection->setDestinationUUID(processoruuid); - processor->addConnection(mergeconnection); - - std::set<core::Relationship> autoTerminatedRelationships; - core::Relationship original("original", ""); - core::Relationship failure("failure", ""); - autoTerminatedRelationships.insert(original); - autoTerminatedRelationships.insert(failure); - processor->setAutoTerminatedRelationships(autoTerminatedRelationships); - - processor->incrementActiveTasks(); - processor->setScheduledState(core::ScheduledState::RUNNING); - logAttributeProcessor->incrementActiveTasks(); - logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING); - - std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor); - std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; - auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo); - context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE); - context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_DEFRAGMENT); - context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT); - context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MaxBinAge, "1 sec"); - - core::ProcessSession sessionGenFlowFile(context); - std::shared_ptr<core::FlowFile> record[6]; + } - // Generate 6 flowfiles, first threes merged to one, second thress merged to one - std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection(); - std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income); - for (int i = 0; i < 6; i++) { - if (i == 4) - continue; - std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create()); - std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt"; - sessionGenFlowFile.import(flowFileName, flow, true, 0); - // three bundle - if (i < 3) - flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0)); - else - flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(1)); - if (i < 3) - flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i)); - else - flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i-3)); - flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3)); - record[i] = flow; - } - income_connection->put(record[0]); - income_connection->put(record[2]); - income_connection->put(record[5]); - income_connection->put(record[1]); - income_connection->put(record[3]); - - REQUIRE(processor->getName() == "mergecontent"); - auto factory = std::make_shared<core::ProcessSessionFactory>(context); - processor->onSchedule(context, factory); - for (int i = 0; i < 6; i++) { - if (i == 4) - continue; - auto session = std::make_shared<core::ProcessSession>(context); - processor->onTrigger(context, session); - session->commit(); - } - std::this_thread::sleep_for(std::chrono::milliseconds(2000)); - { - auto session = std::make_shared<core::ProcessSession>(context); - processor->onTrigger(context, session); - } - // validate the merge content - std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords; - std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords); - std::shared_ptr<core::FlowFile> flow2 = connection->poll(expiredFlowRecords); - REQUIRE(flow1->getSize() == 96); - { - ReadCallback callback(flow1->getSize()); - sessionGenFlowFile.read(flow1, &callback); - std::ifstream file1; - file1.open(EXPECT_MERGE_CONTENT_FIRST, std::ios::in); - std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>()); - std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_); - REQUIRE(expectContents == contents); - file1.close(); - } - REQUIRE(flow2->getSize() == 64); - { - ReadCallback callback(flow2->getSize()); - sessionGenFlowFile.read(flow2, &callback); - std::ifstream file2; - file2.open(EXPECT_MERGE_CONTENT_SECOND, std::ios::in); - std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>()); - std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_); - REQUIRE(expectContents == contents); - file2.close(); - } - LogTestController::getInstance().reset(); - for (int i = 0; i < 6; i++) { - if (i == 4) - continue; - std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt"; - unlink(flowFileName.c_str()); - } - unlink(EXPECT_MERGE_CONTENT_FIRST); - unlink(EXPECT_MERGE_CONTENT_SECOND); - } catch (...) { + MergeTestController testController; + auto context = testController.context; + auto processor = testController.processor; + auto input = testController.input; + auto output = testController.output; + + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_DEFRAGMENT); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MaxBinAge, "1 sec"); + + core::ProcessSession sessionGenFlowFile(context); + std::shared_ptr<core::FlowFile> record[6]; + + // Generate 6 flowfiles, first threes merged to one, second thress merged to one + for (int i = 0; i < 6; i++) { + if (i == 4) + continue; + std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create()); + std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt"; + sessionGenFlowFile.import(flowFileName, flow, true, 0); + // three bundle + if (i < 3) + flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0)); + else + flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(1)); + if (i < 3) + flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i)); + else + flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i-3)); + flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3)); + record[i] = flow; + } + input->put(record[0]); + input->put(record[2]); + input->put(record[5]); + input->put(record[1]); + input->put(record[3]); + + REQUIRE(processor->getName() == "mergecontent"); + auto factory = std::make_shared<core::ProcessSessionFactory>(context); + processor->onSchedule(context, factory); + for (int i = 0; i < 6; i++) { + if (i == 4) + continue; + auto session = std::make_shared<core::ProcessSession>(context); + processor->onTrigger(context, session); + session->commit(); } + std::this_thread::sleep_for(std::chrono::milliseconds(2000)); + { + auto session = std::make_shared<core::ProcessSession>(context); + processor->onTrigger(context, session); + } + // validate the merge content + std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords; + std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords); + std::shared_ptr<core::FlowFile> flow2 = output->poll(expiredFlowRecords); + REQUIRE(flow1->getSize() == 96); + { + FixedBuffer callback(flow1->getSize()); + sessionGenFlowFile.read(flow1, &callback); + std::ifstream file1(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary); + std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>()); + REQUIRE(callback.to_string() == contents); + } + REQUIRE(flow2->getSize() == 64); + { + FixedBuffer callback(flow2->getSize()); + sessionGenFlowFile.read(flow2, &callback); + std::ifstream file2(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary); + std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>()); + REQUIRE(callback.to_string() == contents); + } + LogTestController::getInstance().reset(); } TEST_CASE("MergeFileBinPack", "[mergefiletest4]") { - try { - std::ofstream expectfileFirst; - std::ofstream expectfileSecond; - expectfileFirst.open(EXPECT_MERGE_CONTENT_FIRST); - expectfileSecond.open(EXPECT_MERGE_CONTENT_SECOND); + { + std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary); + std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary); // Create and write to the test file for (int i = 0; i < 6; i++) { std::ofstream tmpfile; std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt"; - tmpfile.open(flowFileName.c_str()); + tmpfile.open(flowFileName.c_str(), std::ios::binary); for (int j = 0; j < 32; j++) { tmpfile << std::to_string(i); if (i < 3) expectfileFirst << std::to_string(i); else expectfileSecond << std::to_string(i); } - tmpfile.close(); } - expectfileFirst.close(); - expectfileSecond.close(); - - TestController testController; - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>(); - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>(); - LogTestController::getInstance().setTrace<core::ProcessSession>(); - LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>(); - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>(); - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>(); - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>(); - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>(); - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>(); - - std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); - - std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent"); - std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); - processor->initialize(); - utils::Identifier processoruuid; - REQUIRE(true == processor->getUUID(processoruuid)); - utils::Identifier logAttributeuuid; - REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid)); - - std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>()); - // connection from merge processor to log attribute - std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection"); - connection->addRelationship(core::Relationship("merged", "Merge successful output")); - connection->setSource(processor); - connection->setDestination(logAttributeProcessor); - connection->setSourceUUID(processoruuid); - connection->setDestinationUUID(logAttributeuuid); - processor->addConnection(connection); - // connection to merge processor - std::shared_ptr<minifi::Connection> mergeconnection = std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection"); - mergeconnection->setDestination(processor); - mergeconnection->setDestinationUUID(processoruuid); - processor->addConnection(mergeconnection); - - std::set<core::Relationship> autoTerminatedRelationships; - core::Relationship original("original", ""); - core::Relationship failure("failure", ""); - autoTerminatedRelationships.insert(original); - autoTerminatedRelationships.insert(failure); - processor->setAutoTerminatedRelationships(autoTerminatedRelationships); - - processor->incrementActiveTasks(); - processor->setScheduledState(core::ScheduledState::RUNNING); - logAttributeProcessor->incrementActiveTasks(); - logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING); + } - std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor); - std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; - auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo); - context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE); - context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_BIN_PACK); - context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT); - context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize, "96"); - context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag"); - - core::ProcessSession sessionGenFlowFile(context); - std::shared_ptr<core::FlowFile> record[6]; - - // Generate 6 flowfiles, first threes merged to one, second thress merged to one - std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection(); - std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income); - for (int i = 0; i < 6; i++) { - std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create()); - std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt"; - sessionGenFlowFile.import(flowFileName, flow, true, 0); - flow->setAttribute("tag", "tag"); - record[i] = flow; - } - income_connection->put(record[0]); - income_connection->put(record[1]); - income_connection->put(record[2]); - income_connection->put(record[3]); - income_connection->put(record[4]); - income_connection->put(record[5]); - - REQUIRE(processor->getName() == "mergecontent"); - auto factory = std::make_shared<core::ProcessSessionFactory>(context); - processor->onSchedule(context, factory); - for (int i = 0; i < 6; i++) { - auto session = std::make_shared<core::ProcessSession>(context); - processor->onTrigger(context, session); - session->commit(); - } - // validate the merge content - std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords; - std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords); - std::shared_ptr<core::FlowFile> flow2 = connection->poll(expiredFlowRecords); - REQUIRE(flow1->getSize() == 96); - { - ReadCallback callback(flow1->getSize()); - sessionGenFlowFile.read(flow1, &callback); - std::ifstream file1; - file1.open(EXPECT_MERGE_CONTENT_FIRST, std::ios::in); - std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>()); - std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_); - REQUIRE(expectContents == contents); - file1.close(); - } - REQUIRE(flow2->getSize() == 96); - { - ReadCallback callback(flow2->getSize()); - sessionGenFlowFile.read(flow2, &callback); - std::ifstream file2; - file2.open(EXPECT_MERGE_CONTENT_SECOND, std::ios::in); - std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>()); - std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_); - REQUIRE(expectContents == contents); - file2.close(); - } - LogTestController::getInstance().reset(); - for (int i = 0; i < 6; i++) { - std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt"; - unlink(flowFileName.c_str()); - } - unlink(EXPECT_MERGE_CONTENT_FIRST); - unlink(EXPECT_MERGE_CONTENT_SECOND); - } catch (...) { + MergeTestController testController; + auto context = testController.context; + auto processor = testController.processor; + auto input = testController.input; + auto output = testController.output; + + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_BIN_PACK); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize, "96"); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag"); + + core::ProcessSession sessionGenFlowFile(context); + std::shared_ptr<core::FlowFile> record[6]; + + // Generate 6 flowfiles, first threes merged to one, second thress merged to one + for (int i = 0; i < 6; i++) { + std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create()); + std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt"; + sessionGenFlowFile.import(flowFileName, flow, true, 0); + flow->setAttribute("tag", "tag"); + record[i] = flow; + } + input->put(record[0]); + input->put(record[1]); + input->put(record[2]); + input->put(record[3]); + input->put(record[4]); + input->put(record[5]); + + REQUIRE(processor->getName() == "mergecontent"); + auto factory = std::make_shared<core::ProcessSessionFactory>(context); + processor->onSchedule(context, factory); + for (int i = 0; i < 6; i++) { + auto session = std::make_shared<core::ProcessSession>(context); + processor->onTrigger(context, session); + session->commit(); } + // validate the merge content + std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords; + std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords); + std::shared_ptr<core::FlowFile> flow2 = output->poll(expiredFlowRecords); + REQUIRE(flow1->getSize() == 96); + { + FixedBuffer callback(flow1->getSize()); + sessionGenFlowFile.read(flow1, &callback); + std::ifstream file1(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary); + std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>()); + REQUIRE(callback.to_string() == contents); + } + REQUIRE(flow2->getSize() == 96); + { + FixedBuffer callback(flow2->getSize()); + sessionGenFlowFile.read(flow2, &callback); + std::ifstream file2(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary); + std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>()); + REQUIRE(callback.to_string() == contents); + } + LogTestController::getInstance().reset(); } TEST_CASE("MergeFileTar", "[mergefiletest4]") { - try { - std::ofstream expectfileFirst; - std::ofstream expectfileSecond; - expectfileFirst.open(EXPECT_MERGE_CONTENT_FIRST); - expectfileSecond.open(EXPECT_MERGE_CONTENT_SECOND); + { + std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary); + std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary); // Create and write to the test file for (int i = 0; i < 6; i++) { std::ofstream tmpfile; std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt"; - tmpfile.open(flowFileName.c_str()); + tmpfile.open(flowFileName.c_str(), std::ios::binary); for (int j = 0; j < 32; j++) { tmpfile << std::to_string(i); if (i < 3) expectfileFirst << std::to_string(i); else expectfileSecond << std::to_string(i); } - tmpfile.close(); } - expectfileFirst.close(); - expectfileSecond.close(); - - TestController testController; - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>(); - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>(); - LogTestController::getInstance().setTrace<core::ProcessSession>(); - LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>(); - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>(); - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>(); - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>(); - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>(); - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>(); - - std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); - - std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent"); - std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); - processor->initialize(); - utils::Identifier processoruuid; - REQUIRE(true == processor->getUUID(processoruuid)); - utils::Identifier logAttributeuuid; - REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid)); - - std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>()); - // connection from merge processor to log attribute - std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection"); - connection->addRelationship(core::Relationship("merged", "Merge successful output")); - connection->setSource(processor); - connection->setDestination(logAttributeProcessor); - connection->setSourceUUID(processoruuid); - connection->setDestinationUUID(logAttributeuuid); - processor->addConnection(connection); - // connection to merge processor - std::shared_ptr<minifi::Connection> mergeconnection = std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection"); - mergeconnection->setDestination(processor); - mergeconnection->setDestinationUUID(processoruuid); - processor->addConnection(mergeconnection); - - std::set<core::Relationship> autoTerminatedRelationships; - core::Relationship original("original", ""); - core::Relationship failure("failure", ""); - autoTerminatedRelationships.insert(original); - autoTerminatedRelationships.insert(failure); - processor->setAutoTerminatedRelationships(autoTerminatedRelationships); - - processor->incrementActiveTasks(); - processor->setScheduledState(core::ScheduledState::RUNNING); - logAttributeProcessor->incrementActiveTasks(); - logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING); + } - std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor); - std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; - auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo); - context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_TAR_VALUE); - context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_BIN_PACK); - context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT); - context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize, "96"); - context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag"); - - core::ProcessSession sessionGenFlowFile(context); - std::shared_ptr<core::FlowFile> record[6]; - - // Generate 6 flowfiles, first threes merged to one, second thress merged to one - std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection(); - std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income); - for (int i = 0; i < 6; i++) { - std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create()); + MergeTestController testController; + auto context = testController.context; + auto processor = testController.processor; + auto input = testController.input; + auto output = testController.output; + + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_TAR_VALUE); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_BIN_PACK); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize, "96"); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag"); + + core::ProcessSession sessionGenFlowFile(context); + std::shared_ptr<core::FlowFile> record[6]; + + // Generate 6 flowfiles, first threes merged to one, second thress merged to one + for (int i = 0; i < 6; i++) { + std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create()); + std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt"; + sessionGenFlowFile.import(flowFileName, flow, true, 0); + flow->setAttribute("tag", "tag"); + record[i] = flow; + } + input->put(record[0]); + input->put(record[1]); + input->put(record[2]); + input->put(record[3]); + input->put(record[4]); + input->put(record[5]); + + REQUIRE(processor->getName() == "mergecontent"); + auto factory = std::make_shared<core::ProcessSessionFactory>(context); + processor->onSchedule(context, factory); + for (int i = 0; i < 6; i++) { + auto session = std::make_shared<core::ProcessSession>(context); + processor->onTrigger(context, session); + session->commit(); + } + // validate the merge content + std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords; + std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords); + std::shared_ptr<core::FlowFile> flow2 = output->poll(expiredFlowRecords); + REQUIRE(flow1->getSize() > 0); + { + FixedBuffer callback(flow1->getSize()); + sessionGenFlowFile.read(flow1, &callback); + auto archives = read_archives(callback); + REQUIRE(archives.size() == 3); + for (int i = 0; i < 3; i++) { std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt"; - sessionGenFlowFile.import(flowFileName, flow, true, 0); - flow->setAttribute("tag", "tag"); - record[i] = flow; - } - income_connection->put(record[0]); - income_connection->put(record[1]); - income_connection->put(record[2]); - income_connection->put(record[3]); - income_connection->put(record[4]); - income_connection->put(record[5]); - - REQUIRE(processor->getName() == "mergecontent"); - auto factory = std::make_shared<core::ProcessSessionFactory>(context); - processor->onSchedule(context, factory); - for (int i = 0; i < 6; i++) { - auto session = std::make_shared<core::ProcessSession>(context); - processor->onTrigger(context, session); - session->commit(); - } - // validate the merge content - std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords; - std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords); - std::shared_ptr<core::FlowFile> flow2 = connection->poll(expiredFlowRecords); - REQUIRE(flow1->getSize() > 0); - { - ReadCallback callback(flow1->getSize()); - sessionGenFlowFile.read(flow1, &callback); - callback.archive_read(); - REQUIRE(callback.archive_buffer_num_ == 3); - for (int i = 0; i < 3; i++) { - std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt"; - std::ifstream file1; - file1.open(flowFileName, std::ios::in); - std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>()); - std::string expectContents(reinterpret_cast<char *> (callback.archive_buffer_[i]), callback.archive_buffer_size_[i]); - REQUIRE(expectContents == contents); - file1.close(); - } - } - REQUIRE(flow2->getSize() > 0); - { - ReadCallback callback(flow2->getSize()); - sessionGenFlowFile.read(flow2, &callback); - callback.archive_read(); - REQUIRE(callback.archive_buffer_num_ == 3); - for (int i = 3; i < 6; i++) { - std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt"; - std::ifstream file1; - file1.open(flowFileName, std::ios::in); - std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>()); - std::string expectContents(reinterpret_cast<char *> (callback.archive_buffer_[i-3]), callback.archive_buffer_size_[i-3]); - REQUIRE(expectContents == contents); - file1.close(); - } + std::ifstream file1(flowFileName, std::ios::binary); + std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>()); + REQUIRE(archives[i].to_string() == contents); } - LogTestController::getInstance().reset(); - for (int i = 0; i < 6; i++) { + } + REQUIRE(flow2->getSize() > 0); + { + FixedBuffer callback(flow2->getSize()); + sessionGenFlowFile.read(flow2, &callback); + auto archives = read_archives(callback); + REQUIRE(archives.size() == 3); + for (int i = 3; i < 6; i++) { std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt"; - unlink(flowFileName.c_str()); + std::ifstream file1(flowFileName, std::ios::binary); + std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>()); + REQUIRE(archives[i-3].to_string() == contents); } - unlink(EXPECT_MERGE_CONTENT_FIRST); - unlink(EXPECT_MERGE_CONTENT_SECOND); - } catch (...) { } + LogTestController::getInstance().reset(); } TEST_CASE("MergeFileZip", "[mergefiletest5]") { - try { - std::ofstream expectfileFirst; - std::ofstream expectfileSecond; - expectfileFirst.open(EXPECT_MERGE_CONTENT_FIRST); - expectfileSecond.open(EXPECT_MERGE_CONTENT_SECOND); + { + std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary); + std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary); // Create and write to the test file for (int i = 0; i < 6; i++) { std::ofstream tmpfile; std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt"; - tmpfile.open(flowFileName.c_str()); + tmpfile.open(flowFileName.c_str(), std::ios::binary); for (int j = 0; j < 32; j++) { tmpfile << std::to_string(i); if (i < 3) expectfileFirst << std::to_string(i); else expectfileSecond << std::to_string(i); } - tmpfile.close(); } - expectfileFirst.close(); - expectfileSecond.close(); - - TestController testController; - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>(); - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>(); - LogTestController::getInstance().setTrace<core::ProcessSession>(); - LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>(); - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>(); - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>(); - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>(); - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>(); - LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>(); - - std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); - - std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent"); - std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); - processor->initialize(); - utils::Identifier processoruuid; - REQUIRE(true == processor->getUUID(processoruuid)); - utils::Identifier logAttributeuuid; - REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid)); - - std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>()); - // connection from merge processor to log attribute - std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection"); - connection->addRelationship(core::Relationship("merged", "Merge successful output")); - connection->setSource(processor); - connection->setDestination(logAttributeProcessor); - connection->setSourceUUID(processoruuid); - connection->setDestinationUUID(logAttributeuuid); - processor->addConnection(connection); - // connection to merge processor - std::shared_ptr<minifi::Connection> mergeconnection = std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection"); - mergeconnection->setDestination(processor); - mergeconnection->setDestinationUUID(processoruuid); - processor->addConnection(mergeconnection); - - std::set<core::Relationship> autoTerminatedRelationships; - core::Relationship original("original", ""); - core::Relationship failure("failure", ""); - autoTerminatedRelationships.insert(original); - autoTerminatedRelationships.insert(failure); - processor->setAutoTerminatedRelationships(autoTerminatedRelationships); - - processor->incrementActiveTasks(); - processor->setScheduledState(core::ScheduledState::RUNNING); - logAttributeProcessor->incrementActiveTasks(); - logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING); + } - std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor); - std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; - auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo); - context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_ZIP_VALUE); - context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_BIN_PACK); - context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT); - context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize, "96"); - context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag"); - - core::ProcessSession sessionGenFlowFile(context); - std::shared_ptr<core::FlowFile> record[6]; - - // Generate 6 flowfiles, first threes merged to one, second thress merged to one - std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection(); - std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income); Review comment: it seems to me, that I have already moved this one ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org