adamdebreceni commented on a change in pull request #804:
URL: https://github.com/apache/nifi-minifi-cpp/pull/804#discussion_r437399244
##########
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##########
@@ -252,3 +252,63 @@ TEST_CASE("Test Validate Checkpoint ", "[TestFFR5]") {
LogTestController::getInstance().reset();
}
+TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
+ TestController testController;
+ LogTestController::getInstance().setDebug<core::ContentRepository>();
+
LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
+ LogTestController::getInstance().setTrace<minifi::ResourceClaim>();
+ LogTestController::getInstance().setTrace<minifi::FlowFileRecord>();
+
+ char format[] = "/var/tmp/testRepo.XXXXXX";
+ auto dir = testController.createTempDirectory(format);
+
+ auto config = std::make_shared<minifi::Configure>();
+ config->set(minifi::Configure::nifi_dbcontent_repository_directory_default,
utils::file::FileUtils::concat_path(dir, "content_repository"));
+ config->set(minifi::Configure::nifi_flowfile_repository_directory_default,
utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+ std::shared_ptr<core::Repository> prov_repo =
std::make_shared<TestRepository>();
+ std::shared_ptr<core::repository::FlowFileRepository> ff_repository =
std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
+ std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::FileSystemRepository>();
+ ff_repository->initialize(config);
+ content_repo->initialize(config);
+
+ std::shared_ptr<minifi::Connection> input =
std::make_shared<minifi::Connection>(ff_repository, content_repo, "Input");
+
+ auto root =
std::make_shared<core::ProcessGroup>(core::ProcessGroupType::ROOT_PROCESS_GROUP,
"root");
+ root->addConnection(input);
+
+ auto flowConfig = std::unique_ptr<core::FlowConfiguration>{new
core::FlowConfiguration(prov_repo, ff_repository, content_repo, nullptr,
config, "")};
+ auto flowController = std::make_shared<minifi::FlowController>(prov_repo,
ff_repository, config, std::move(flowConfig), content_repo, "", true);
+
+ std::string data = "banana";
+ minifi::io::DataStream content(reinterpret_cast<const
uint8_t*>(data.c_str()), data.length());
+
+ {
+ std::shared_ptr<core::Processor> processor =
std::make_shared<core::Processor>("dummy");
+ std::shared_ptr<core::ProcessorNode> node =
std::make_shared<core::ProcessorNode>(processor);
+ std::shared_ptr<core::controller::ControllerServiceProvider>
controller_services_provider = nullptr;
+ auto context = std::make_shared<core::ProcessContext>(node,
controller_services_provider, prov_repo, ff_repository, content_repo);
+ core::ProcessSession sessionGenFlowFile(context);
+ std::shared_ptr<core::FlowFile> flow =
std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
+ sessionGenFlowFile.importFrom(content, flow);
+ input->put(flow); // stores it in the flowFileRepository
+ }
+
+ // remove flow from the connection but it is still present in the
+ // flowFileRepo
+ std::set<std::shared_ptr<core::FlowFile>> expiredFiles;
+ auto oldFlow = input->poll(expiredFiles);
+ REQUIRE(oldFlow);
+ REQUIRE(expiredFiles.empty());
+
+ flowController->load(root);
+ ff_repository->start();
+
+ std::this_thread::sleep_for(std::chrono::milliseconds{500});
+
+ auto newFlow = input->poll(expiredFiles);
+ REQUIRE(newFlow);
+ REQUIRE(expiredFiles.empty());
+
+ LogTestController::getInstance().reset();
+}
Review comment:
at the end of the file?
##########
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##########
@@ -252,3 +252,63 @@ TEST_CASE("Test Validate Checkpoint ", "[TestFFR5]") {
LogTestController::getInstance().reset();
}
+TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
+ TestController testController;
+ LogTestController::getInstance().setDebug<core::ContentRepository>();
+
LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
+ LogTestController::getInstance().setTrace<minifi::ResourceClaim>();
+ LogTestController::getInstance().setTrace<minifi::FlowFileRecord>();
+
+ char format[] = "/var/tmp/testRepo.XXXXXX";
+ auto dir = testController.createTempDirectory(format);
+
+ auto config = std::make_shared<minifi::Configure>();
+ config->set(minifi::Configure::nifi_dbcontent_repository_directory_default,
utils::file::FileUtils::concat_path(dir, "content_repository"));
+ config->set(minifi::Configure::nifi_flowfile_repository_directory_default,
utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+ std::shared_ptr<core::Repository> prov_repo =
std::make_shared<TestRepository>();
+ std::shared_ptr<core::repository::FlowFileRepository> ff_repository =
std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
+ std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::FileSystemRepository>();
+ ff_repository->initialize(config);
+ content_repo->initialize(config);
+
+ std::shared_ptr<minifi::Connection> input =
std::make_shared<minifi::Connection>(ff_repository, content_repo, "Input");
+
+ auto root =
std::make_shared<core::ProcessGroup>(core::ProcessGroupType::ROOT_PROCESS_GROUP,
"root");
+ root->addConnection(input);
+
+ auto flowConfig = std::unique_ptr<core::FlowConfiguration>{new
core::FlowConfiguration(prov_repo, ff_repository, content_repo, nullptr,
config, "")};
+ auto flowController = std::make_shared<minifi::FlowController>(prov_repo,
ff_repository, config, std::move(flowConfig), content_repo, "", true);
+
+ std::string data = "banana";
+ minifi::io::DataStream content(reinterpret_cast<const
uint8_t*>(data.c_str()), data.length());
+
+ {
+ std::shared_ptr<core::Processor> processor =
std::make_shared<core::Processor>("dummy");
+ std::shared_ptr<core::ProcessorNode> node =
std::make_shared<core::ProcessorNode>(processor);
+ std::shared_ptr<core::controller::ControllerServiceProvider>
controller_services_provider = nullptr;
+ auto context = std::make_shared<core::ProcessContext>(node,
controller_services_provider, prov_repo, ff_repository, content_repo);
+ core::ProcessSession sessionGenFlowFile(context);
+ std::shared_ptr<core::FlowFile> flow =
std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
+ sessionGenFlowFile.importFrom(content, flow);
+ input->put(flow); // stores it in the flowFileRepository
+ }
+
+ // remove flow from the connection but it is still present in the
+ // flowFileRepo
+ std::set<std::shared_ptr<core::FlowFile>> expiredFiles;
+ auto oldFlow = input->poll(expiredFiles);
+ REQUIRE(oldFlow);
+ REQUIRE(expiredFiles.empty());
+
+ flowController->load(root);
+ ff_repository->start();
+
+ std::this_thread::sleep_for(std::chrono::milliseconds{500});
+
+ auto newFlow = input->poll(expiredFiles);
Review comment:
added some comments
##########
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##########
@@ -252,3 +252,77 @@ TEST_CASE("Test Validate Checkpoint ", "[TestFFR5]") {
LogTestController::getInstance().reset();
}
+TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
+ TestController testController;
+ LogTestController::getInstance().setDebug<core::ContentRepository>();
+
LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
+ LogTestController::getInstance().setTrace<minifi::ResourceClaim>();
+ LogTestController::getInstance().setTrace<minifi::FlowFileRecord>();
+
+ char format[] = "/var/tmp/testRepo.XXXXXX";
+ auto dir = testController.createTempDirectory(format);
+
+ auto config = std::make_shared<minifi::Configure>();
+ config->set(minifi::Configure::nifi_dbcontent_repository_directory_default,
utils::file::FileUtils::concat_path(dir, "content_repository"));
+ config->set(minifi::Configure::nifi_flowfile_repository_directory_default,
utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+ std::shared_ptr<core::Repository> prov_repo =
std::make_shared<TestRepository>();
+ std::shared_ptr<core::repository::FlowFileRepository> ff_repository =
std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
+ std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::FileSystemRepository>();
+ ff_repository->initialize(config);
+ content_repo->initialize(config);
+
+ std::shared_ptr<minifi::Connection> input =
std::make_shared<minifi::Connection>(ff_repository, content_repo, "Input");
+
+ auto root =
std::make_shared<core::ProcessGroup>(core::ProcessGroupType::ROOT_PROCESS_GROUP,
"root");
+ root->addConnection(input);
+
+ auto flowConfig = std::unique_ptr<core::FlowConfiguration>{new
core::FlowConfiguration(prov_repo, ff_repository, content_repo, nullptr,
config, "")};
+ auto flowController = std::make_shared<minifi::FlowController>(prov_repo,
ff_repository, config, std::move(flowConfig), content_repo, "", true);
+
+ std::string data = "banana";
+ minifi::io::DataStream content(reinterpret_cast<const
uint8_t*>(data.c_str()), data.length());
+
+ /**
+ * Currently it is the Connection's responsibility to persist the incoming
+ * flowFiles to the FlowFileRepository. Upon restart the FlowFileRepository
+ * checks the persisted database and moves every FlowFile into the Connection
+ * that persisted it (if it can find it. We could have a different flow, in
+ * which case the bastard FlowFiles are deleted.)
Review comment:
corrected
##########
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##########
@@ -252,3 +252,63 @@ TEST_CASE("Test Validate Checkpoint ", "[TestFFR5]") {
LogTestController::getInstance().reset();
}
+TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
+ TestController testController;
+ LogTestController::getInstance().setDebug<core::ContentRepository>();
+
LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
+ LogTestController::getInstance().setTrace<minifi::ResourceClaim>();
+ LogTestController::getInstance().setTrace<minifi::FlowFileRecord>();
+
+ char format[] = "/var/tmp/testRepo.XXXXXX";
+ auto dir = testController.createTempDirectory(format);
+
+ auto config = std::make_shared<minifi::Configure>();
+ config->set(minifi::Configure::nifi_dbcontent_repository_directory_default,
utils::file::FileUtils::concat_path(dir, "content_repository"));
+ config->set(minifi::Configure::nifi_flowfile_repository_directory_default,
utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+ std::shared_ptr<core::Repository> prov_repo =
std::make_shared<TestRepository>();
+ std::shared_ptr<core::repository::FlowFileRepository> ff_repository =
std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
+ std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::FileSystemRepository>();
+ ff_repository->initialize(config);
+ content_repo->initialize(config);
+
+ std::shared_ptr<minifi::Connection> input =
std::make_shared<minifi::Connection>(ff_repository, content_repo, "Input");
+
+ auto root =
std::make_shared<core::ProcessGroup>(core::ProcessGroupType::ROOT_PROCESS_GROUP,
"root");
+ root->addConnection(input);
+
+ auto flowConfig = std::unique_ptr<core::FlowConfiguration>{new
core::FlowConfiguration(prov_repo, ff_repository, content_repo, nullptr,
config, "")};
+ auto flowController = std::make_shared<minifi::FlowController>(prov_repo,
ff_repository, config, std::move(flowConfig), content_repo, "", true);
+
+ std::string data = "banana";
+ minifi::io::DataStream content(reinterpret_cast<const
uint8_t*>(data.c_str()), data.length());
+
+ {
+ std::shared_ptr<core::Processor> processor =
std::make_shared<core::Processor>("dummy");
+ std::shared_ptr<core::ProcessorNode> node =
std::make_shared<core::ProcessorNode>(processor);
+ std::shared_ptr<core::controller::ControllerServiceProvider>
controller_services_provider = nullptr;
+ auto context = std::make_shared<core::ProcessContext>(node,
controller_services_provider, prov_repo, ff_repository, content_repo);
+ core::ProcessSession sessionGenFlowFile(context);
+ std::shared_ptr<core::FlowFile> flow =
std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
+ sessionGenFlowFile.importFrom(content, flow);
+ input->put(flow); // stores it in the flowFileRepository
+ }
+
+ // remove flow from the connection but it is still present in the
+ // flowFileRepo
+ std::set<std::shared_ptr<core::FlowFile>> expiredFiles;
+ auto oldFlow = input->poll(expiredFiles);
+ REQUIRE(oldFlow);
+ REQUIRE(expiredFiles.empty());
+
+ flowController->load(root);
+ ff_repository->start();
+
+ std::this_thread::sleep_for(std::chrono::milliseconds{500});
+
+ auto newFlow = input->poll(expiredFiles);
+ REQUIRE(newFlow);
+ REQUIRE(expiredFiles.empty());
+
+ LogTestController::getInstance().reset();
+}
Review comment:
done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]