arpadboda commented on a change in pull request #804:
URL: https://github.com/apache/nifi-minifi-cpp/pull/804#discussion_r437361382



##########
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##########
@@ -252,3 +252,63 @@ TEST_CASE("Test Validate Checkpoint ", "[TestFFR5]") {
   LogTestController::getInstance().reset();
 }
 
+TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
+  TestController testController;
+  LogTestController::getInstance().setDebug<core::ContentRepository>();
+  
LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
+  LogTestController::getInstance().setTrace<minifi::ResourceClaim>();
+  LogTestController::getInstance().setTrace<minifi::FlowFileRecord>();
+
+  char format[] = "/var/tmp/testRepo.XXXXXX";
+  auto dir = testController.createTempDirectory(format);
+
+  auto config = std::make_shared<minifi::Configure>();
+  config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, 
utils::file::FileUtils::concat_path(dir, "content_repository"));
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, 
utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+  std::shared_ptr<core::Repository> prov_repo = 
std::make_shared<TestRepository>();
+  std::shared_ptr<core::repository::FlowFileRepository> ff_repository = 
std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
+  std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::FileSystemRepository>();
+  ff_repository->initialize(config);
+  content_repo->initialize(config);
+
+  std::shared_ptr<minifi::Connection> input = 
std::make_shared<minifi::Connection>(ff_repository, content_repo, "Input");
+
+  auto root = 
std::make_shared<core::ProcessGroup>(core::ProcessGroupType::ROOT_PROCESS_GROUP,
 "root");
+  root->addConnection(input);
+
+  auto flowConfig = std::unique_ptr<core::FlowConfiguration>{new 
core::FlowConfiguration(prov_repo, ff_repository, content_repo, nullptr, 
config, "")};
+  auto flowController = std::make_shared<minifi::FlowController>(prov_repo, 
ff_repository, config, std::move(flowConfig), content_repo, "", true);
+
+  std::string data = "banana";
+  minifi::io::DataStream content(reinterpret_cast<const 
uint8_t*>(data.c_str()), data.length());
+
+  {
+    std::shared_ptr<core::Processor> processor = 
std::make_shared<core::Processor>("dummy");
+    std::shared_ptr<core::ProcessorNode> node = 
std::make_shared<core::ProcessorNode>(processor);
+    std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_services_provider = nullptr;
+    auto context = std::make_shared<core::ProcessContext>(node, 
controller_services_provider, prov_repo, ff_repository, content_repo);
+    core::ProcessSession sessionGenFlowFile(context);
+    std::shared_ptr<core::FlowFile> flow = 
std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
+    sessionGenFlowFile.importFrom(content, flow);
+    input->put(flow); // stores it in the flowFileRepository
+  }
+
+  // remove flow from the connection but it is still present in the
+  // flowFileRepo
+  std::set<std::shared_ptr<core::FlowFile>> expiredFiles;
+  auto oldFlow = input->poll(expiredFiles);
+  REQUIRE(oldFlow);
+  REQUIRE(expiredFiles.empty());
+
+  flowController->load(root);
+  ff_repository->start();
+
+  std::this_thread::sleep_for(std::chrono::milliseconds{500});
+
+  auto newFlow = input->poll(expiredFiles);

Review comment:
       Could you comment some reason behind this?
   I mean something like responsibility of deleting the pulled flowfile from 
repo, why it's kept there in the current case. 

##########
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##########
@@ -252,3 +252,63 @@ TEST_CASE("Test Validate Checkpoint ", "[TestFFR5]") {
   LogTestController::getInstance().reset();
 }
 
+TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
+  TestController testController;
+  LogTestController::getInstance().setDebug<core::ContentRepository>();
+  
LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
+  LogTestController::getInstance().setTrace<minifi::ResourceClaim>();
+  LogTestController::getInstance().setTrace<minifi::FlowFileRecord>();
+
+  char format[] = "/var/tmp/testRepo.XXXXXX";
+  auto dir = testController.createTempDirectory(format);
+
+  auto config = std::make_shared<minifi::Configure>();
+  config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, 
utils::file::FileUtils::concat_path(dir, "content_repository"));
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, 
utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+  std::shared_ptr<core::Repository> prov_repo = 
std::make_shared<TestRepository>();
+  std::shared_ptr<core::repository::FlowFileRepository> ff_repository = 
std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
+  std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::FileSystemRepository>();
+  ff_repository->initialize(config);
+  content_repo->initialize(config);
+
+  std::shared_ptr<minifi::Connection> input = 
std::make_shared<minifi::Connection>(ff_repository, content_repo, "Input");
+
+  auto root = 
std::make_shared<core::ProcessGroup>(core::ProcessGroupType::ROOT_PROCESS_GROUP,
 "root");
+  root->addConnection(input);
+
+  auto flowConfig = std::unique_ptr<core::FlowConfiguration>{new 
core::FlowConfiguration(prov_repo, ff_repository, content_repo, nullptr, 
config, "")};
+  auto flowController = std::make_shared<minifi::FlowController>(prov_repo, 
ff_repository, config, std::move(flowConfig), content_repo, "", true);
+
+  std::string data = "banana";
+  minifi::io::DataStream content(reinterpret_cast<const 
uint8_t*>(data.c_str()), data.length());
+
+  {
+    std::shared_ptr<core::Processor> processor = 
std::make_shared<core::Processor>("dummy");
+    std::shared_ptr<core::ProcessorNode> node = 
std::make_shared<core::ProcessorNode>(processor);
+    std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_services_provider = nullptr;
+    auto context = std::make_shared<core::ProcessContext>(node, 
controller_services_provider, prov_repo, ff_repository, content_repo);
+    core::ProcessSession sessionGenFlowFile(context);
+    std::shared_ptr<core::FlowFile> flow = 
std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
+    sessionGenFlowFile.importFrom(content, flow);
+    input->put(flow); // stores it in the flowFileRepository
+  }
+
+  // remove flow from the connection but it is still present in the
+  // flowFileRepo
+  std::set<std::shared_ptr<core::FlowFile>> expiredFiles;
+  auto oldFlow = input->poll(expiredFiles);
+  REQUIRE(oldFlow);
+  REQUIRE(expiredFiles.empty());
+
+  flowController->load(root);
+  ff_repository->start();
+
+  std::this_thread::sleep_for(std::chrono::milliseconds{500});
+
+  auto newFlow = input->poll(expiredFiles);
+  REQUIRE(newFlow);
+  REQUIRE(expiredFiles.empty());
+
+  LogTestController::getInstance().reset();
+}

Review comment:
       Nitpicking: missing newline

##########
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##########
@@ -252,3 +252,63 @@ TEST_CASE("Test Validate Checkpoint ", "[TestFFR5]") {
   LogTestController::getInstance().reset();
 }
 
+TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
+  TestController testController;
+  LogTestController::getInstance().setDebug<core::ContentRepository>();
+  
LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
+  LogTestController::getInstance().setTrace<minifi::ResourceClaim>();
+  LogTestController::getInstance().setTrace<minifi::FlowFileRecord>();
+
+  char format[] = "/var/tmp/testRepo.XXXXXX";
+  auto dir = testController.createTempDirectory(format);
+
+  auto config = std::make_shared<minifi::Configure>();
+  config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, 
utils::file::FileUtils::concat_path(dir, "content_repository"));
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, 
utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+  std::shared_ptr<core::Repository> prov_repo = 
std::make_shared<TestRepository>();
+  std::shared_ptr<core::repository::FlowFileRepository> ff_repository = 
std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
+  std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::FileSystemRepository>();
+  ff_repository->initialize(config);
+  content_repo->initialize(config);
+
+  std::shared_ptr<minifi::Connection> input = 
std::make_shared<minifi::Connection>(ff_repository, content_repo, "Input");
+
+  auto root = 
std::make_shared<core::ProcessGroup>(core::ProcessGroupType::ROOT_PROCESS_GROUP,
 "root");
+  root->addConnection(input);
+
+  auto flowConfig = std::unique_ptr<core::FlowConfiguration>{new 
core::FlowConfiguration(prov_repo, ff_repository, content_repo, nullptr, 
config, "")};
+  auto flowController = std::make_shared<minifi::FlowController>(prov_repo, 
ff_repository, config, std::move(flowConfig), content_repo, "", true);
+
+  std::string data = "banana";
+  minifi::io::DataStream content(reinterpret_cast<const 
uint8_t*>(data.c_str()), data.length());
+
+  {
+    std::shared_ptr<core::Processor> processor = 
std::make_shared<core::Processor>("dummy");
+    std::shared_ptr<core::ProcessorNode> node = 
std::make_shared<core::ProcessorNode>(processor);
+    std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_services_provider = nullptr;
+    auto context = std::make_shared<core::ProcessContext>(node, 
controller_services_provider, prov_repo, ff_repository, content_repo);
+    core::ProcessSession sessionGenFlowFile(context);
+    std::shared_ptr<core::FlowFile> flow = 
std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
+    sessionGenFlowFile.importFrom(content, flow);
+    input->put(flow); // stores it in the flowFileRepository
+  }
+
+  // remove flow from the connection but it is still present in the
+  // flowFileRepo
+  std::set<std::shared_ptr<core::FlowFile>> expiredFiles;
+  auto oldFlow = input->poll(expiredFiles);
+  REQUIRE(oldFlow);
+  REQUIRE(expiredFiles.empty());
+
+  flowController->load(root);
+  ff_repository->start();
+
+  std::this_thread::sleep_for(std::chrono::milliseconds{500});
+
+  auto newFlow = input->poll(expiredFiles);
+  REQUIRE(newFlow);
+  REQUIRE(expiredFiles.empty());
+
+  LogTestController::getInstance().reset();
+}

Review comment:
       Yes, I think linter would complain. The file might not be linted atm. 

##########
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##########
@@ -252,3 +252,63 @@ TEST_CASE("Test Validate Checkpoint ", "[TestFFR5]") {
   LogTestController::getInstance().reset();
 }
 
+TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
+  TestController testController;
+  LogTestController::getInstance().setDebug<core::ContentRepository>();
+  
LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
+  LogTestController::getInstance().setTrace<minifi::ResourceClaim>();
+  LogTestController::getInstance().setTrace<minifi::FlowFileRecord>();
+
+  char format[] = "/var/tmp/testRepo.XXXXXX";
+  auto dir = testController.createTempDirectory(format);
+
+  auto config = std::make_shared<minifi::Configure>();
+  config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, 
utils::file::FileUtils::concat_path(dir, "content_repository"));
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, 
utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+  std::shared_ptr<core::Repository> prov_repo = 
std::make_shared<TestRepository>();
+  std::shared_ptr<core::repository::FlowFileRepository> ff_repository = 
std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
+  std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::FileSystemRepository>();
+  ff_repository->initialize(config);
+  content_repo->initialize(config);
+
+  std::shared_ptr<minifi::Connection> input = 
std::make_shared<minifi::Connection>(ff_repository, content_repo, "Input");
+
+  auto root = 
std::make_shared<core::ProcessGroup>(core::ProcessGroupType::ROOT_PROCESS_GROUP,
 "root");
+  root->addConnection(input);
+
+  auto flowConfig = std::unique_ptr<core::FlowConfiguration>{new 
core::FlowConfiguration(prov_repo, ff_repository, content_repo, nullptr, 
config, "")};
+  auto flowController = std::make_shared<minifi::FlowController>(prov_repo, 
ff_repository, config, std::move(flowConfig), content_repo, "", true);
+
+  std::string data = "banana";
+  minifi::io::DataStream content(reinterpret_cast<const 
uint8_t*>(data.c_str()), data.length());
+
+  {
+    std::shared_ptr<core::Processor> processor = 
std::make_shared<core::Processor>("dummy");
+    std::shared_ptr<core::ProcessorNode> node = 
std::make_shared<core::ProcessorNode>(processor);
+    std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_services_provider = nullptr;
+    auto context = std::make_shared<core::ProcessContext>(node, 
controller_services_provider, prov_repo, ff_repository, content_repo);
+    core::ProcessSession sessionGenFlowFile(context);
+    std::shared_ptr<core::FlowFile> flow = 
std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
+    sessionGenFlowFile.importFrom(content, flow);
+    input->put(flow); // stores it in the flowFileRepository
+  }
+
+  // remove flow from the connection but it is still present in the
+  // flowFileRepo
+  std::set<std::shared_ptr<core::FlowFile>> expiredFiles;
+  auto oldFlow = input->poll(expiredFiles);
+  REQUIRE(oldFlow);
+  REQUIRE(expiredFiles.empty());
+
+  flowController->load(root);
+  ff_repository->start();
+
+  std::this_thread::sleep_for(std::chrono::milliseconds{500});
+
+  auto newFlow = input->poll(expiredFiles);

Review comment:
       Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to