Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 9f161a27e -> 1a2fa1ea5
MINIFICPP-217: Resolve compilation failures within Bin files and merge files test This closes #140. Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/1a2fa1ea Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/1a2fa1ea Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/1a2fa1ea Branch: refs/heads/master Commit: 1a2fa1ea52389a821c6f5f56bf6e4b1a352d7f16 Parents: 9f161a2 Author: Marc Parisi <[email protected]> Authored: Mon Oct 2 11:36:52 2017 -0400 Committer: Aldrin Piri <[email protected]> Committed: Mon Oct 2 11:54:59 2017 -0400 ---------------------------------------------------------------------- libminifi/include/processors/BinFiles.h | 45 +++++++---- libminifi/src/processors/BinFiles.cpp | 12 +-- libminifi/test/unit/MergeFileTests.cpp | 116 +++++++++++++-------------- 3 files changed, 92 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a2fa1ea/libminifi/include/processors/BinFiles.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/BinFiles.h b/libminifi/include/processors/BinFiles.h index 6c619a8..69399a3 100644 --- a/libminifi/include/processors/BinFiles.h +++ b/libminifi/include/processors/BinFiles.h @@ -41,13 +41,17 @@ namespace processors { class Bin { public: // Constructor - /*! - * Create a new Bin. Note: this object is not thread safe - */ - explicit Bin(const uint64_t &minSize, const uint64_t &maxSize, const int &minEntries, const int & maxEntries, - const std::string &fileCount, const std::string &groupId) - : minSize_(minSize), maxSize_(maxSize), maxEntries_(maxEntries), minEntries_(minEntries), fileCount_(fileCount), - groupId_(groupId), logger_(logging::LoggerFactory<Bin>::getLogger()) { + /*! + * Create a new Bin. Note: this object is not thread safe + */ + explicit Bin(const uint64_t &minSize, const uint64_t &maxSize, const int &minEntries, const int & maxEntries, const std::string &fileCount, const std::string &groupId) + : minSize_(minSize), + maxSize_(maxSize), + maxEntries_(maxEntries), + minEntries_(minEntries), + fileCount_(fileCount), + groupId_(groupId), + logger_(logging::LoggerFactory<Bin>::getLogger()) { queued_data_size_ = 0; creation_dated_ = getTimeMillis(); std::shared_ptr<utils::IdGenerator> id_generator = utils::IdGenerator::getIdGenerator(); @@ -103,8 +107,7 @@ class Bin { queue_.push_back(flow); queued_data_size_ += flow->getSize(); - logger_->log_info("Bin %s for group %s offer size %d byte %d min_entry %d max_entry %d", - uuid_str_, groupId_, queue_.size(), queued_data_size_, minEntries_, maxEntries_); + logger_->log_info("Bin %s for group %s offer size %d byte %d min_entry %d max_entry %d", uuid_str_, groupId_, queue_.size(), queued_data_size_, minEntries_, maxEntries_); return true; } @@ -132,7 +135,7 @@ class Bin { int minEntries_; // Queued data size uint64_t queued_data_size_; - // Queue for the Flow File + // Queue for the Flow File std::deque<std::shared_ptr<core::FlowFile>> queue_; uint64_t creation_dated_; std::string fileCount_; @@ -148,11 +151,16 @@ class Bin { class BinManager { public: // Constructor - /*! - * Create a new BinManager - */ + /*! + * Create a new BinManager + */ BinManager() - : minSize_(0), maxSize_(ULLONG_MAX), maxEntries_(INT_MAX), minEntries_(1), binAge_(ULLONG_MAX), binCount_(0), + : minSize_(0), + maxSize_(ULLONG_MAX), + maxEntries_(INT_MAX), + minEntries_(1), + binAge_(ULLONG_MAX), + binCount_(0), logger_(logging::LoggerFactory<BinManager>::getLogger()) { } virtual ~BinManager() { @@ -180,7 +188,7 @@ class BinManager { fileCount_ = value; } void purge() { - std::lock_guard < std::mutex > lock(mutex_); + std::lock_guard<std::mutex> lock(mutex_); groupBinMap_.clear(); binCount_ = 0; } @@ -204,7 +212,7 @@ class BinManager { std::string fileCount_; // Bin Age in msec uint64_t binAge_; - std::map<std::string, std::unique_ptr<std::deque<std::unique_ptr<Bin>>>> groupBinMap_; + std::map<std::string, std::unique_ptr<std::deque<std::unique_ptr<Bin>>> >groupBinMap_; std::deque<std::unique_ptr<Bin>> readyBin_; int binCount_; std::shared_ptr<logging::Logger> logger_; @@ -258,7 +266,10 @@ class BinFiles : public core::Processor { */ void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); // OnTrigger method, implemented by NiFi BinFiles - virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session); + virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session) { + } + // OnTrigger method, implemented by NiFi BinFiles + virtual void onTrigger(std::shared_ptr<core::ProcessContext> context, std::shared_ptr<core::ProcessSession> session); // Initialize, over write by NiFi BinFiles virtual void initialize(void); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a2fa1ea/libminifi/src/processors/BinFiles.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/BinFiles.cpp b/libminifi/src/processors/BinFiles.cpp index bd4afca..5d118a2 100644 --- a/libminifi/src/processors/BinFiles.cpp +++ b/libminifi/src/processors/BinFiles.cpp @@ -233,12 +233,12 @@ bool BinManager::offer(const std::string &group, std::shared_ptr<core::FlowFile> return true; } -void BinFiles::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { +void BinFiles::onTrigger(std::shared_ptr<core::ProcessContext> context, std::shared_ptr<core::ProcessSession> session) { std::shared_ptr<FlowFileRecord> flow = std::static_pointer_cast < FlowFileRecord > (session->get()); if (flow != nullptr) { - preprocessFlowFile(context, session, flow); - std::string groupId = getGroupId(context, flow); + preprocessFlowFile(context.get(), session.get(), flow); + std::string groupId = getGroupId(context.get(), flow); bool offer = this->binManager_.offer(groupId, flow); if (!offer) { @@ -272,10 +272,10 @@ void BinFiles::onTrigger(core::ProcessContext *context, core::ProcessSession *se std::unique_ptr<Bin> bin = std::move(readyBins.front()); readyBins.pop_front(); // add bin's flows to the session - this->addFlowsToSession(context, &mergeSession, bin); + this->addFlowsToSession(context.get(), &mergeSession, bin); logger_->log_info("BinFiles start to process bin %s for group %s", bin->getUUIDStr(), bin->getGroupId()); - if (!this->processBin(context, &mergeSession, bin)) - this->transferFlowsToFail(context, &mergeSession, bin); + if (!this->processBin(context.get(), &mergeSession, bin)) + this->transferFlowsToFail(context.get(), &mergeSession, bin); } mergeSession.commit(); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a2fa1ea/libminifi/test/unit/MergeFileTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/MergeFileTests.cpp b/libminifi/test/unit/MergeFileTests.cpp index d3e4fa9..cf09df1 100644 --- a/libminifi/test/unit/MergeFileTests.cpp +++ b/libminifi/test/unit/MergeFileTests.cpp @@ -143,18 +143,18 @@ TEST_CASE("MergeFileDefragment", "[mergefiletest1]") { logAttributeProcessor->incrementActiveTasks(); logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING); - core::ProcessorNode node(processor); + std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor); std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; - core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo); - context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE); - context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_DEFRAGMENT); - context.setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT); + auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_DEFRAGMENT); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT); - core::ProcessSession sessionGenFlowFile(&context); + core::ProcessSession sessionGenFlowFile(context); std::shared_ptr<core::FlowFile> record[6]; // Generate 6 flowfiles, first threes merged to one, second thress merged to one - std::shared_ptr<core::Connectable> income = node.getNextIncomingConnection(); + std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection(); std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income); for (int i = 0; i < 6; i++) { std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create()); @@ -180,12 +180,12 @@ TEST_CASE("MergeFileDefragment", "[mergefiletest1]") { income_connection->put(record[3]); REQUIRE(processor->getName() == "mergecontent"); - core::ProcessSessionFactory factory(&context); - processor->onSchedule(&context, &factory); + auto factory = std::make_shared<core::ProcessSessionFactory>(context); + processor->onSchedule(context, factory); for (int i = 0; i < 6; i++) { - core::ProcessSession session(&context); - processor->onTrigger(&context, &session); - session.commit(); + auto session = std::make_shared<core::ProcessSession>(context); + processor->onTrigger(context, session); + session->commit(); } // validate the merge content std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords; @@ -315,21 +315,21 @@ TEST_CASE("MergeFileDefragmentDelimiter", "[mergefiletest2]") { logAttributeProcessor->incrementActiveTasks(); logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING); - core::ProcessorNode node(processor); + std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor); std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; - core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo); - context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE); - context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_DEFRAGMENT); - context.setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_FILENAME); - context.setProperty(org::apache::nifi::minifi::processors::MergeContent::Header, "/tmp/minifi-mergecontent.header"); - context.setProperty(org::apache::nifi::minifi::processors::MergeContent::Footer, "/tmp/minifi-mergecontent.footer"); - context.setProperty(org::apache::nifi::minifi::processors::MergeContent::Demarcator, "/tmp/minifi-mergecontent.demarcator"); - - core::ProcessSession sessionGenFlowFile(&context); + auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_DEFRAGMENT); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_FILENAME); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Header, "/tmp/minifi-mergecontent.header"); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Footer, "/tmp/minifi-mergecontent.footer"); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Demarcator, "/tmp/minifi-mergecontent.demarcator"); + + core::ProcessSession sessionGenFlowFile(context); std::shared_ptr<core::FlowFile> record[6]; // Generate 6 flowfiles, first threes merged to one, second thress merged to one - std::shared_ptr<core::Connectable> income = node.getNextIncomingConnection(); + std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection(); std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income); for (int i = 0; i < 6; i++) { std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create()); @@ -355,12 +355,12 @@ TEST_CASE("MergeFileDefragmentDelimiter", "[mergefiletest2]") { income_connection->put(record[3]); REQUIRE(processor->getName() == "mergecontent"); - core::ProcessSessionFactory factory(&context); - processor->onSchedule(&context, &factory); + auto factory = std::make_shared<core::ProcessSessionFactory>(context); + processor->onSchedule(context, factory); for (int i = 0; i < 6; i++) { - core::ProcessSession session(&context); - processor->onTrigger(&context, &session); - session.commit(); + auto session = std::make_shared<core::ProcessSession>(context); + processor->onTrigger(context, session); + session->commit(); } // validate the merge content std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords; @@ -478,19 +478,19 @@ TEST_CASE("MergeFileDefragmentDropFlow", "[mergefiletest3]") { logAttributeProcessor->incrementActiveTasks(); logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING); - core::ProcessorNode node(processor); + std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor); std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; - core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo); - context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE); - context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_DEFRAGMENT); - context.setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT); - context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MaxBinAge, "1 sec"); + auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_DEFRAGMENT); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MaxBinAge, "1 sec"); - core::ProcessSession sessionGenFlowFile(&context); + core::ProcessSession sessionGenFlowFile(context); std::shared_ptr<core::FlowFile> record[6]; // Generate 6 flowfiles, first threes merged to one, second thress merged to one - std::shared_ptr<core::Connectable> income = node.getNextIncomingConnection(); + std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection(); std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income); for (int i = 0; i < 6; i++) { if (i == 4) @@ -517,19 +517,19 @@ TEST_CASE("MergeFileDefragmentDropFlow", "[mergefiletest3]") { income_connection->put(record[3]); REQUIRE(processor->getName() == "mergecontent"); - core::ProcessSessionFactory factory(&context); - processor->onSchedule(&context, &factory); + auto factory = std::make_shared<core::ProcessSessionFactory>(context); + processor->onSchedule(context, factory); for (int i = 0; i < 6; i++) { if (i == 4) continue; - core::ProcessSession session(&context); - processor->onTrigger(&context, &session); - session.commit(); + auto session = std::make_shared<core::ProcessSession>(context); + processor->onTrigger(context, session); + session->commit(); } std::this_thread::sleep_for(std::chrono::milliseconds(2000)); { - core::ProcessSession session(&context); - processor->onTrigger(&context, &session); + auto session = std::make_shared<core::ProcessSession>(context); + processor->onTrigger(context, session); } // validate the merge content std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords; @@ -643,20 +643,20 @@ TEST_CASE("MergeFileBinPack", "[mergefiletest4]") { logAttributeProcessor->incrementActiveTasks(); logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING); - core::ProcessorNode node(processor); + std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor); std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; - core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo); - context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE); - context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_BIN_PACK); - context.setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT); - context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize, "96"); - context.setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag"); - - core::ProcessSession sessionGenFlowFile(&context); + auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_BIN_PACK); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize, "96"); + context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag"); + + core::ProcessSession sessionGenFlowFile(context); std::shared_ptr<core::FlowFile> record[6]; // Generate 6 flowfiles, first threes merged to one, second thress merged to one - std::shared_ptr<core::Connectable> income = node.getNextIncomingConnection(); + std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection(); std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income); for (int i = 0; i < 6; i++) { std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create()); @@ -673,12 +673,12 @@ TEST_CASE("MergeFileBinPack", "[mergefiletest4]") { income_connection->put(record[5]); REQUIRE(processor->getName() == "mergecontent"); - core::ProcessSessionFactory factory(&context); - processor->onSchedule(&context, &factory); + auto factory = std::make_shared<core::ProcessSessionFactory>(context); + processor->onSchedule(context, factory); for (int i = 0; i < 6; i++) { - core::ProcessSession session(&context); - processor->onTrigger(&context, &session); - session.commit(); + auto session = std::make_shared<core::ProcessSession>(context); + processor->onTrigger(context, session); + session->commit(); } // validate the merge content std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
