Repository: nifi-minifi-cpp Updated Branches: refs/heads/master c45f05e51 -> 89b9a1987
MINIFI-182 Added initial event-based scheduler implementation This closes #40. Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/89b9a198 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/89b9a198 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/89b9a198 Branch: refs/heads/master Commit: 89b9a1987766ab18196db676b085305689933b0f Parents: c45f05e Author: Andrew Christianson <[email protected]> Authored: Tue Jan 10 19:08:44 2017 +0000 Committer: Aldrin Piri <[email protected]> Committed: Wed Jan 18 15:26:10 2017 -0500 ---------------------------------------------------------------------- libminifi/include/EventDrivenSchedulingAgent.h | 55 ++++++++++ libminifi/include/FlowControlProtocol.h | 8 +- libminifi/include/FlowController.h | 5 +- libminifi/include/ProcessGroup.h | 7 +- libminifi/include/Processor.h | 14 +++ libminifi/include/ThreadedSchedulingAgent.h | 70 ++++++++++++ libminifi/include/TimerDrivenSchedulingAgent.h | 19 +--- libminifi/src/Connection.cpp | 23 ++-- libminifi/src/EventDrivenSchedulingAgent.cpp | 47 ++++++++ libminifi/src/FlowController.cpp | 12 ++- libminifi/src/ProcessGroup.cpp | 14 ++- libminifi/src/Processor.cpp | 61 +++++++++++ libminifi/src/SchedulingAgent.cpp | 3 +- libminifi/src/ThreadedSchedulingAgent.cpp | 113 ++++++++++++++++++++ libminifi/src/TimerDrivenSchedulingAgent.cpp | 99 +---------------- 15 files changed, 418 insertions(+), 132 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/include/EventDrivenSchedulingAgent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/EventDrivenSchedulingAgent.h b/libminifi/include/EventDrivenSchedulingAgent.h new file mode 100644 index 0000000..f6e6ffb --- /dev/null +++ b/libminifi/include/EventDrivenSchedulingAgent.h @@ -0,0 +1,55 @@ +/** + * @file EventDrivenSchedulingAgent.h + * EventDrivenSchedulingAgent class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __EVENT_DRIVEN_SCHEDULING_AGENT_H__ +#define __EVENT_DRIVEN_SCHEDULING_AGENT_H__ + +#include "Logger.h" +#include "Processor.h" +#include "ProcessContext.h" +#include "ThreadedSchedulingAgent.h" + +//! EventDrivenSchedulingAgent Class +class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent +{ +public: + //! Constructor + /*! + * Create a new processor + */ + EventDrivenSchedulingAgent() + : ThreadedSchedulingAgent() + { + } + //! Destructor + virtual ~EventDrivenSchedulingAgent() + { + } + //! Run function for the thread + void run(Processor *processor); + +private: + // Prevent default copy constructor and assignment operation + // Only support pass by reference or pointer + EventDrivenSchedulingAgent(const EventDrivenSchedulingAgent &parent); + EventDrivenSchedulingAgent &operator=(const EventDrivenSchedulingAgent &parent); + +}; + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/include/FlowControlProtocol.h ---------------------------------------------------------------------- diff --git a/libminifi/include/FlowControlProtocol.h b/libminifi/include/FlowControlProtocol.h index be32e1e..2e8cc72 100644 --- a/libminifi/include/FlowControlProtocol.h +++ b/libminifi/include/FlowControlProtocol.h @@ -304,9 +304,9 @@ private: //! Mutex for protection std::mutex _mtx; //! Logger - Logger *_logger; + Logger *_logger = NULL; //! Configure - Configure *_configure; + Configure *_configure = NULL; //! NiFi server Name std::string _serverName; //! NiFi server port @@ -322,13 +322,13 @@ private: //! seq number uint32_t _seqNumber; //! FlowController - FlowController *_controller; + FlowController *_controller = NULL; //! report Blob char *_reportBlob; //! report Blob len; int _reportBlobLen; //! thread - std::thread *_thread; + std::thread *_thread = NULL; //! whether it is running bool _running; // Prevent default copy constructor and assignment operation http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/include/FlowController.h ---------------------------------------------------------------------- diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h index ee8bb4f..9635bec 100644 --- a/libminifi/include/FlowController.h +++ b/libminifi/include/FlowController.h @@ -45,6 +45,7 @@ #include "LogAttribute.h" #include "RealTimeDataCollector.h" #include "TimerDrivenSchedulingAgent.h" +#include "EventDrivenSchedulingAgent.h" #include "FlowControlProtocol.h" #include "RemoteProcessorGroupPort.h" #include "Provenance.h" @@ -198,8 +199,10 @@ protected: //! Provenance Repo ProvenanceRepository *_provenanceRepo; //! Flow Engines - //! Flow Scheduler + //! Flow Timer Scheduler TimerDrivenSchedulingAgent _timerScheduler; + //! Flow Event Scheduler + EventDrivenSchedulingAgent _eventScheduler; //! Controller Service //! Config //! Site to Site Server Listener http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/include/ProcessGroup.h ---------------------------------------------------------------------- diff --git a/libminifi/include/ProcessGroup.h b/libminifi/include/ProcessGroup.h index 4dd26f8..304cfe6 100644 --- a/libminifi/include/ProcessGroup.h +++ b/libminifi/include/ProcessGroup.h @@ -33,6 +33,7 @@ #include "Processor.h" #include "Exception.h" #include "TimerDrivenSchedulingAgent.h" +#include "EventDrivenSchedulingAgent.h" //! Process Group Type enum ProcessGroupType @@ -111,9 +112,11 @@ public: return false; } //! Start Processing - void startProcessing(TimerDrivenSchedulingAgent *timeScheduler); + void startProcessing(TimerDrivenSchedulingAgent *timeScheduler, + EventDrivenSchedulingAgent *eventScheduler); //! Stop Processing - void stopProcessing(TimerDrivenSchedulingAgent *timeScheduler); + void stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, + EventDrivenSchedulingAgent *eventScheduler); //! Whether it is root process group bool isRootProcessGroup(); //! set parent process group http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/include/Processor.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Processor.h b/libminifi/include/Processor.h index db26ad0..eb3ef9f 100644 --- a/libminifi/include/Processor.h +++ b/libminifi/include/Processor.h @@ -25,9 +25,11 @@ #include <queue> #include <map> #include <mutex> +#include <condition_variable> #include <atomic> #include <algorithm> #include <set> +#include <chrono> #include "TimeUtil.h" #include "Property.h" @@ -278,6 +280,10 @@ public: Connection *getNextIncomingConnection(); //! On Trigger void onTrigger(); + //! Block until work is available on any input connection, or the given duration elapses + void waitForWork(uint64_t timeoutMs); + //! Notify this processor that work may be available + void notifyWork(); public: //! OnTrigger method, implemented by NiFi Processor Designer @@ -334,6 +340,14 @@ private: std::atomic<uint64_t> _yieldExpiration; //! Incoming connection Iterator std::set<Connection *>::iterator _incomingConnectionsIter; + //! Condition for whether there is incoming work to do + bool _hasWork = false; + //! Concurrent condition mutex for whether there is incoming work to do + std::mutex _workAvailableMtx; + //! Concurrent condition variable for whether there is incoming work to do + std::condition_variable _hasWorkCondition; + //! Check all incoming connections for work + bool isWorkAvailable(); //! Logger Logger *_logger; // Prevent default copy constructor and assignment operation http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/include/ThreadedSchedulingAgent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/ThreadedSchedulingAgent.h b/libminifi/include/ThreadedSchedulingAgent.h new file mode 100644 index 0000000..2b14e3d --- /dev/null +++ b/libminifi/include/ThreadedSchedulingAgent.h @@ -0,0 +1,70 @@ +/** + * @file ThreadedSchedulingAgent.h + * ThreadedSchedulingAgent class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __THREADED_SCHEDULING_AGENT_H__ +#define __THREADED_SCHEDULING_AGENT_H__ + +#include "Logger.h" +#include "Configure.h" +#include "Processor.h" +#include "ProcessContext.h" +#include "SchedulingAgent.h" + +/** + * An abstract scheduling agent which creates and manages a pool of threads for + * each processor scheduled. + */ +class ThreadedSchedulingAgent : public SchedulingAgent +{ +public: + //! Constructor + /*! + * Create a new processor + */ + ThreadedSchedulingAgent() + : SchedulingAgent() + { + } + //! Destructor + virtual ~ThreadedSchedulingAgent() + { + } + + //! Run function for the thread + virtual void run(Processor *processor) = 0; + +public: + //! schedule, overwritten by different DrivenTimerDrivenSchedulingAgent + virtual void schedule(Processor *processor); + //! unschedule, overwritten by different DrivenTimerDrivenSchedulingAgent + virtual void unschedule(Processor *processor); + +protected: + //! Threads + std::map<std::string, std::vector<std::thread *>> _threads; + +private: + // Prevent default copy constructor and assignment operation + // Only support pass by reference or pointer + ThreadedSchedulingAgent(const ThreadedSchedulingAgent &parent); + ThreadedSchedulingAgent &operator=(const ThreadedSchedulingAgent &parent); + +}; + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/include/TimerDrivenSchedulingAgent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h b/libminifi/include/TimerDrivenSchedulingAgent.h index 9195745..7fd86f6 100644 --- a/libminifi/include/TimerDrivenSchedulingAgent.h +++ b/libminifi/include/TimerDrivenSchedulingAgent.h @@ -21,13 +21,12 @@ #define __TIMER_DRIVEN_SCHEDULING_AGENT_H__ #include "Logger.h" -#include "Configure.h" #include "Processor.h" #include "ProcessContext.h" -#include "SchedulingAgent.h" +#include "ThreadedSchedulingAgent.h" //! TimerDrivenSchedulingAgent Class -class TimerDrivenSchedulingAgent : public SchedulingAgent +class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent { public: //! Constructor @@ -35,7 +34,7 @@ public: * Create a new processor */ TimerDrivenSchedulingAgent() - : SchedulingAgent() + : ThreadedSchedulingAgent() { } //! Destructor @@ -43,19 +42,9 @@ public: { } //! Run function for the thread - static void run(TimerDrivenSchedulingAgent *agent, Processor *processor); - -public: - //! schedule, overwritten by different DrivenTimerDrivenSchedulingAgent - virtual void schedule(Processor *processor); - //! unschedule, overwritten by different DrivenTimerDrivenSchedulingAgent - virtual void unschedule(Processor *processor); - -protected: + void run(Processor *processor); private: - //! Threads - std::map<std::string, std::vector<std::thread *>> _threads; // Prevent default copy constructor and assignment operation // Only support pass by reference or pointer TimerDrivenSchedulingAgent(const TimerDrivenSchedulingAgent &parent); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/src/Connection.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp index e036b89..7beaf7a 100644 --- a/libminifi/src/Connection.cpp +++ b/libminifi/src/Connection.cpp @@ -28,6 +28,7 @@ #include <iostream> #include "Connection.h" +#include "Processor.h" Connection::Connection(std::string name, uuid_t uuid, uuid_t srcUUID, uuid_t destUUID) : _name(name) @@ -81,14 +82,22 @@ bool Connection::isFull() void Connection::put(FlowFileRecord *flow) { - std::lock_guard<std::mutex> lock(_mtx); - - _queue.push(flow); - - _queuedDataSize += flow->getSize(); + { + std::lock_guard<std::mutex> lock(_mtx); + + _queue.push(flow); + + _queuedDataSize += flow->getSize(); + + _logger->log_debug("Enqueue flow file UUID %s to connection %s", + flow->getUUIDStr().c_str(), _name.c_str()); + } - _logger->log_debug("Enqueue flow file UUID %s to connection %s", - flow->getUUIDStr().c_str(), _name.c_str()); + // Notify receiving processor that work may be available + if(_destProcessor) + { + _destProcessor->notifyWork(); + } } FlowFileRecord* Connection::poll(std::set<FlowFileRecord *> &expiredFlowRecords) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/src/EventDrivenSchedulingAgent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp b/libminifi/src/EventDrivenSchedulingAgent.cpp new file mode 100644 index 0000000..ed1d8ea --- /dev/null +++ b/libminifi/src/EventDrivenSchedulingAgent.cpp @@ -0,0 +1,47 @@ +/** + * @file EventDrivenSchedulingAgent.cpp + * EventDrivenSchedulingAgent class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <chrono> +#include <thread> +#include <iostream> +#include "Property.h" +#include "EventDrivenSchedulingAgent.h" + +void EventDrivenSchedulingAgent::run(Processor *processor) +{ + while (this->_running) + { + bool shouldYield = this->onTrigger(processor); + + if (processor->isYield()) + { + // Honor the yield + std::this_thread::sleep_for(std::chrono::milliseconds(processor->getYieldTime())); + } + else if (shouldYield && this->_boredYieldDuration > 0) + { + // No work to do or need to apply back pressure + std::this_thread::sleep_for(std::chrono::milliseconds(this->_boredYieldDuration)); + } + + // Block until work is available + processor->waitForWork(1000); + } + return; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/src/FlowController.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index caaa8ea..dce9e34 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -117,10 +117,13 @@ void FlowController::stop(bool force) { _logger->log_info("Stop Flow Controller"); this->_timerScheduler.stop(); + this->_eventScheduler.stop(); // Wait for sometime for thread stop std::this_thread::sleep_for(std::chrono::milliseconds(1000)); if (this->_root) - this->_root->stopProcessing(&this->_timerScheduler); + this->_root->stopProcessing( + &this->_timerScheduler, + &this->_eventScheduler); _running = false; } } @@ -621,7 +624,7 @@ void FlowController::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, ProcessGro this->parsePortYaml(&currPort, group, RECEIVE); } // for node } - + } } } @@ -1197,8 +1200,11 @@ bool FlowController::start() { if (!_running) { _logger->log_info("Start Flow Controller"); this->_timerScheduler.start(); + this->_eventScheduler.start(); if (this->_root) - this->_root->startProcessing(&this->_timerScheduler); + this->_root->startProcessing( + &this->_timerScheduler, + &this->_eventScheduler); _running = true; this->_protocol->start(); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/src/ProcessGroup.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/ProcessGroup.cpp b/libminifi/src/ProcessGroup.cpp index 70ee9d7..7c98278 100644 --- a/libminifi/src/ProcessGroup.cpp +++ b/libminifi/src/ProcessGroup.cpp @@ -127,7 +127,8 @@ void ProcessGroup::removeProcessGroup(ProcessGroup *child) } } -void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler) +void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler, + EventDrivenSchedulingAgent *eventScheduler) { std::lock_guard<std::mutex> lock(_mtx); @@ -141,13 +142,15 @@ void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler) { if (processor->getSchedulingStrategy() == TIMER_DRIVEN) timeScheduler->schedule(processor); + else if (processor->getSchedulingStrategy() == EVENT_DRIVEN) + eventScheduler->schedule(processor); } } for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it) { ProcessGroup *processGroup(*it); - processGroup->startProcessing(timeScheduler); + processGroup->startProcessing(timeScheduler, eventScheduler); } } catch (std::exception &exception) @@ -162,7 +165,8 @@ void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler) } } -void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler) +void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, + EventDrivenSchedulingAgent *eventScheduler) { std::lock_guard<std::mutex> lock(_mtx); @@ -174,12 +178,14 @@ void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler) Processor *processor(*it); if (processor->getSchedulingStrategy() == TIMER_DRIVEN) timeScheduler->unschedule(processor); + else if (processor->getSchedulingStrategy() == EVENT_DRIVEN) + eventScheduler->unschedule(processor); } for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it) { ProcessGroup *processGroup(*it); - processGroup->stopProcessing(timeScheduler); + processGroup->stopProcessing(timeScheduler, eventScheduler); } } catch (std::exception &exception) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/src/Processor.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Processor.cpp b/libminifi/src/Processor.cpp index cc136dc..8da253a 100644 --- a/libminifi/src/Processor.cpp +++ b/libminifi/src/Processor.cpp @@ -449,3 +449,64 @@ void Processor::onTrigger() throw; } } + +void Processor::waitForWork(uint64_t timeoutMs) +{ + std::unique_lock<std::mutex> lock(_workAvailableMtx); + _hasWork = isWorkAvailable(); + + if (!_hasWork) + { + _hasWorkCondition.wait_for(lock, std::chrono::milliseconds(timeoutMs), [&] { return _hasWork; }); + } + + lock.unlock(); +} + +void Processor::notifyWork() +{ + // Do nothing if we are not event-driven + if (_strategy != EVENT_DRIVEN) + { + return; + } + + { + std::unique_lock<std::mutex> lock(_workAvailableMtx); + _hasWork = isWorkAvailable(); + + // Keep a scope-local copy of the state to avoid race conditions + bool hasWork = _hasWork; + + lock.unlock(); + + if (hasWork) + { + _hasWorkCondition.notify_one(); + } + } +} + +bool Processor::isWorkAvailable() +{ + // We have work if any incoming connection has work + bool hasWork = false; + + try + { + for (const auto &conn : getIncomingConnections()) + { + if (conn->getQueueSize() > 0) + { + hasWork = true; + break; + } + } + } + catch (...) + { + _logger->log_error("Caught an exception while checking if work is available; unless it was positively determined that work is available, assuming NO work is available!"); + } + + return hasWork; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/src/SchedulingAgent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp index 211c328..a81cd55 100644 --- a/libminifi/src/SchedulingAgent.cpp +++ b/libminifi/src/SchedulingAgent.cpp @@ -82,5 +82,4 @@ bool SchedulingAgent::onTrigger(Processor *processor) } return false; -} - +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/src/ThreadedSchedulingAgent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp b/libminifi/src/ThreadedSchedulingAgent.cpp new file mode 100644 index 0000000..0338019 --- /dev/null +++ b/libminifi/src/ThreadedSchedulingAgent.cpp @@ -0,0 +1,113 @@ +/** + * @file ThreadedSchedulingAgent.cpp + * ThreadedSchedulingAgent class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <thread> +#include <iostream> + +#include "ThreadedSchedulingAgent.h" + +void ThreadedSchedulingAgent::schedule(Processor *processor) +{ + std::lock_guard<std::mutex> lock(_mtx); + + _administrativeYieldDuration = 0; + std::string yieldValue; + + if (_configure->get(Configure::nifi_administrative_yield_duration, yieldValue)) + { + TimeUnit unit; + if (Property::StringToTime(yieldValue, _administrativeYieldDuration, unit) && + Property::ConvertTimeUnitToMS(_administrativeYieldDuration, unit, _administrativeYieldDuration)) + { + _logger->log_debug("nifi_administrative_yield_duration: [%d] ms", _administrativeYieldDuration); + } + } + + _boredYieldDuration = 0; + if (_configure->get(Configure::nifi_bored_yield_duration, yieldValue)) + { + TimeUnit unit; + if (Property::StringToTime(yieldValue, _boredYieldDuration, unit) && + Property::ConvertTimeUnitToMS(_boredYieldDuration, unit, _boredYieldDuration)) + { + _logger->log_debug("nifi_bored_yield_duration: [%d] ms", _boredYieldDuration); + } + } + + if (processor->getScheduledState() != RUNNING) + { + _logger->log_info("Can not schedule threads for processor %s because it is not running", processor->getName().c_str()); + return; + } + + std::map<std::string, std::vector<std::thread *>>::iterator it = + _threads.find(processor->getUUIDStr()); + if (it != _threads.end()) + { + _logger->log_info("Can not schedule threads for processor %s because there are existing threads running"); + return; + } + + std::vector<std::thread *> threads; + for (int i = 0; i < processor->getMaxConcurrentTasks(); i++) + { + ThreadedSchedulingAgent *agent = this; + std::thread *thread = new std::thread([agent, processor] () { agent->run(processor); }); + thread->detach(); + threads.push_back(thread); + _logger->log_info("Scheduled thread %d running for process %s", thread->get_id(), + processor->getName().c_str()); + } + _threads[processor->getUUIDStr().c_str()] = threads; + + return; +} + +void ThreadedSchedulingAgent::unschedule(Processor *processor) +{ + std::lock_guard<std::mutex> lock(_mtx); + + _logger->log_info("Shutting down threads for processor %s/%s", + processor->getName().c_str(), + processor->getUUIDStr().c_str()); + + if (processor->getScheduledState() != RUNNING) + { + _logger->log_info("Cannot unschedule threads for processor %s because it is not running", processor->getName().c_str()); + return; + } + + std::map<std::string, std::vector<std::thread *>>::iterator it = + _threads.find(processor->getUUIDStr()); + + if (it == _threads.end()) + { + _logger->log_info("Cannot unschedule threads for processor %s because there are no existing threads running", processor->getName().c_str()); + return; + } + for (std::vector<std::thread *>::iterator itThread = it->second.begin(); itThread != it->second.end(); ++itThread) + { + std::thread *thread = *itThread; + _logger->log_info("Scheduled thread %d deleted for process %s", thread->get_id(), + processor->getName().c_str()); + delete thread; + } + _threads.erase(processor->getUUIDStr()); + processor->clearActiveTask(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/src/TimerDrivenSchedulingAgent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp b/libminifi/src/TimerDrivenSchedulingAgent.cpp index 3ce57ae..09b7ed6 100644 --- a/libminifi/src/TimerDrivenSchedulingAgent.cpp +++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp @@ -23,112 +23,23 @@ #include "Property.h" #include "TimerDrivenSchedulingAgent.h" -void TimerDrivenSchedulingAgent::schedule(Processor *processor) +void TimerDrivenSchedulingAgent::run(Processor *processor) { - std::lock_guard<std::mutex> lock(_mtx); - - _administrativeYieldDuration = 0; - std::string yieldValue; - - if (_configure->get(Configure::nifi_administrative_yield_duration, yieldValue)) + while (this->_running) { - TimeUnit unit; - if (Property::StringToTime(yieldValue, _administrativeYieldDuration, unit) && - Property::ConvertTimeUnitToMS(_administrativeYieldDuration, unit, _administrativeYieldDuration)) - { - _logger->log_debug("nifi_administrative_yield_duration: [%d] ms", _administrativeYieldDuration); - } - } - - _boredYieldDuration = 0; - if (_configure->get(Configure::nifi_bored_yield_duration, yieldValue)) - { - TimeUnit unit; - if (Property::StringToTime(yieldValue, _boredYieldDuration, unit) && - Property::ConvertTimeUnitToMS(_boredYieldDuration, unit, _boredYieldDuration)) - { - _logger->log_debug("nifi_bored_yield_duration: [%d] ms", _boredYieldDuration); - } - } - - if (processor->getScheduledState() != RUNNING) - { - _logger->log_info("Can not schedule threads for processor %s because it is not running", processor->getName().c_str()); - return; - } - - std::map<std::string, std::vector<std::thread *>>::iterator it = - _threads.find(processor->getUUIDStr()); - if (it != _threads.end()) - { - _logger->log_info("Can not schedule threads for processor %s because there are existed thread running"); - return; - } - - std::vector<std::thread *> threads; - for (int i = 0; i < processor->getMaxConcurrentTasks(); i++) - { - std::thread *thread = new std::thread(run, this, processor); - thread->detach(); - threads.push_back(thread); - _logger->log_info("Scheduled Time Driven thread %d running for process %s", thread->get_id(), - processor->getName().c_str()); - } - _threads[processor->getUUIDStr().c_str()] = threads; - - return; -} - -void TimerDrivenSchedulingAgent::unschedule(Processor *processor) -{ - std::lock_guard<std::mutex> lock(_mtx); - - if (processor->getScheduledState() != RUNNING) - { - _logger->log_info("Can not unschedule threads for processor %s because it is not running", processor->getName().c_str()); - return; - } - - std::map<std::string, std::vector<std::thread *>>::iterator it = - _threads.find(processor->getUUIDStr()); - - if (it == _threads.end()) - { - _logger->log_info("Can not unschedule threads for processor %s because there are no existed thread running"); - return; - } - for (std::vector<std::thread *>::iterator itThread = it->second.begin(); itThread != it->second.end(); ++itThread) - { - std::thread *thread = *itThread; - _logger->log_info("Scheduled Time Driven thread %d deleted for process %s", thread->get_id(), - processor->getName().c_str()); - delete thread; - } - _threads.erase(processor->getUUIDStr()); - processor->clearActiveTask(); - - return; -} - -void TimerDrivenSchedulingAgent::run(TimerDrivenSchedulingAgent *agent, Processor *processor) -{ - while (agent->_running) - { - bool shouldYield = agent->onTrigger(processor); + bool shouldYield = this->onTrigger(processor); if (processor->isYield()) { // Honor the yield std::this_thread::sleep_for(std::chrono::milliseconds(processor->getYieldTime())); } - else if (shouldYield && agent->_boredYieldDuration > 0) + else if (shouldYield && this->_boredYieldDuration > 0) { // No work to do or need to apply back pressure - std::this_thread::sleep_for(std::chrono::milliseconds(agent->_boredYieldDuration)); + std::this_thread::sleep_for(std::chrono::milliseconds(this->_boredYieldDuration)); } std::this_thread::sleep_for(std::chrono::nanoseconds(processor->getSchedulingPeriodNano())); } return; } - -
