Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master c45f05e51 -> 89b9a1987


MINIFI-182 Added initial event-based scheduler implementation

This closes #40.

Signed-off-by: Aldrin Piri <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/89b9a198
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/89b9a198
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/89b9a198

Branch: refs/heads/master
Commit: 89b9a1987766ab18196db676b085305689933b0f
Parents: c45f05e
Author: Andrew Christianson <[email protected]>
Authored: Tue Jan 10 19:08:44 2017 +0000
Committer: Aldrin Piri <[email protected]>
Committed: Wed Jan 18 15:26:10 2017 -0500

----------------------------------------------------------------------
 libminifi/include/EventDrivenSchedulingAgent.h |  55 ++++++++++
 libminifi/include/FlowControlProtocol.h        |   8 +-
 libminifi/include/FlowController.h             |   5 +-
 libminifi/include/ProcessGroup.h               |   7 +-
 libminifi/include/Processor.h                  |  14 +++
 libminifi/include/ThreadedSchedulingAgent.h    |  70 ++++++++++++
 libminifi/include/TimerDrivenSchedulingAgent.h |  19 +---
 libminifi/src/Connection.cpp                   |  23 ++--
 libminifi/src/EventDrivenSchedulingAgent.cpp   |  47 ++++++++
 libminifi/src/FlowController.cpp               |  12 ++-
 libminifi/src/ProcessGroup.cpp                 |  14 ++-
 libminifi/src/Processor.cpp                    |  61 +++++++++++
 libminifi/src/SchedulingAgent.cpp              |   3 +-
 libminifi/src/ThreadedSchedulingAgent.cpp      | 113 ++++++++++++++++++++
 libminifi/src/TimerDrivenSchedulingAgent.cpp   |  99 +----------------
 15 files changed, 418 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/include/EventDrivenSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/EventDrivenSchedulingAgent.h 
b/libminifi/include/EventDrivenSchedulingAgent.h
new file mode 100644
index 0000000..f6e6ffb
--- /dev/null
+++ b/libminifi/include/EventDrivenSchedulingAgent.h
@@ -0,0 +1,55 @@
+/**
+ * @file EventDrivenSchedulingAgent.h
+ * EventDrivenSchedulingAgent class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __EVENT_DRIVEN_SCHEDULING_AGENT_H__
+#define __EVENT_DRIVEN_SCHEDULING_AGENT_H__
+
+#include "Logger.h"
+#include "Processor.h"
+#include "ProcessContext.h"
+#include "ThreadedSchedulingAgent.h"
+
+//! EventDrivenSchedulingAgent Class
+class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent
+{
+public:
+       //! Constructor
+       /*!
+        * Create a new processor
+        */
+       EventDrivenSchedulingAgent()
+       : ThreadedSchedulingAgent()
+       {
+       }
+       //! Destructor
+       virtual ~EventDrivenSchedulingAgent()
+       {
+       }
+       //! Run function for the thread
+       void run(Processor *processor);
+
+private:
+       // Prevent default copy constructor and assignment operation
+       // Only support pass by reference or pointer
+       EventDrivenSchedulingAgent(const EventDrivenSchedulingAgent &parent);
+       EventDrivenSchedulingAgent &operator=(const EventDrivenSchedulingAgent 
&parent);
+
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/include/FlowControlProtocol.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowControlProtocol.h 
b/libminifi/include/FlowControlProtocol.h
index be32e1e..2e8cc72 100644
--- a/libminifi/include/FlowControlProtocol.h
+++ b/libminifi/include/FlowControlProtocol.h
@@ -304,9 +304,9 @@ private:
        //! Mutex for protection
        std::mutex _mtx;
        //! Logger
-       Logger *_logger;
+       Logger *_logger = NULL;
        //! Configure
-       Configure *_configure;
+       Configure *_configure = NULL;
        //! NiFi server Name
        std::string _serverName;
        //! NiFi server port
@@ -322,13 +322,13 @@ private:
        //! seq number
        uint32_t _seqNumber;
        //! FlowController
-       FlowController *_controller;
+       FlowController *_controller = NULL;
        //! report Blob
        char *_reportBlob;
        //! report Blob len;
        int _reportBlobLen;
        //! thread
-       std::thread *_thread;
+       std::thread *_thread = NULL;
        //! whether it is running
        bool _running;
        // Prevent default copy constructor and assignment operation

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/include/FlowController.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowController.h 
b/libminifi/include/FlowController.h
index ee8bb4f..9635bec 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -45,6 +45,7 @@
 #include "LogAttribute.h"
 #include "RealTimeDataCollector.h"
 #include "TimerDrivenSchedulingAgent.h"
+#include "EventDrivenSchedulingAgent.h"
 #include "FlowControlProtocol.h"
 #include "RemoteProcessorGroupPort.h"
 #include "Provenance.h"
@@ -198,8 +199,10 @@ protected:
        //! Provenance Repo
        ProvenanceRepository *_provenanceRepo;
        //! Flow Engines
-       //! Flow Scheduler
+       //! Flow Timer Scheduler
        TimerDrivenSchedulingAgent _timerScheduler;
+       //! Flow Event Scheduler
+       EventDrivenSchedulingAgent _eventScheduler;
        //! Controller Service
        //! Config
        //! Site to Site Server Listener

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/include/ProcessGroup.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ProcessGroup.h b/libminifi/include/ProcessGroup.h
index 4dd26f8..304cfe6 100644
--- a/libminifi/include/ProcessGroup.h
+++ b/libminifi/include/ProcessGroup.h
@@ -33,6 +33,7 @@
 #include "Processor.h"
 #include "Exception.h"
 #include "TimerDrivenSchedulingAgent.h"
+#include "EventDrivenSchedulingAgent.h"
 
 //! Process Group Type
 enum ProcessGroupType
@@ -111,9 +112,11 @@ public:
                        return false;
        }
        //! Start Processing
-       void startProcessing(TimerDrivenSchedulingAgent *timeScheduler);
+       void startProcessing(TimerDrivenSchedulingAgent *timeScheduler,
+                       EventDrivenSchedulingAgent *eventScheduler);
        //! Stop Processing
-       void stopProcessing(TimerDrivenSchedulingAgent *timeScheduler);
+       void stopProcessing(TimerDrivenSchedulingAgent *timeScheduler,
+                       EventDrivenSchedulingAgent *eventScheduler);
        //! Whether it is root process group
        bool isRootProcessGroup();
        //! set parent process group

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/include/Processor.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Processor.h b/libminifi/include/Processor.h
index db26ad0..eb3ef9f 100644
--- a/libminifi/include/Processor.h
+++ b/libminifi/include/Processor.h
@@ -25,9 +25,11 @@
 #include <queue>
 #include <map>
 #include <mutex>
+#include <condition_variable>
 #include <atomic>
 #include <algorithm>
 #include <set>
+#include <chrono>
 
 #include "TimeUtil.h"
 #include "Property.h"
@@ -278,6 +280,10 @@ public:
        Connection *getNextIncomingConnection();
        //! On Trigger
        void onTrigger();
+       //! Block until work is available on any input connection, or the given 
duration elapses
+       void waitForWork(uint64_t timeoutMs);
+       //! Notify this processor that work may be available
+       void notifyWork();
 
 public:
        //! OnTrigger method, implemented by NiFi Processor Designer
@@ -334,6 +340,14 @@ private:
        std::atomic<uint64_t> _yieldExpiration;
        //! Incoming connection Iterator
        std::set<Connection *>::iterator _incomingConnectionsIter;
+       //! Condition for whether there is incoming work to do
+       bool _hasWork = false;
+       //! Concurrent condition mutex for whether there is incoming work to do
+       std::mutex _workAvailableMtx;
+       //! Concurrent condition variable for whether there is incoming work to 
do
+       std::condition_variable _hasWorkCondition;
+       //! Check all incoming connections for work
+       bool isWorkAvailable();
        //! Logger
        Logger *_logger;
        // Prevent default copy constructor and assignment operation

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/include/ThreadedSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ThreadedSchedulingAgent.h 
b/libminifi/include/ThreadedSchedulingAgent.h
new file mode 100644
index 0000000..2b14e3d
--- /dev/null
+++ b/libminifi/include/ThreadedSchedulingAgent.h
@@ -0,0 +1,70 @@
+/**
+ * @file ThreadedSchedulingAgent.h
+ * ThreadedSchedulingAgent class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __THREADED_SCHEDULING_AGENT_H__
+#define __THREADED_SCHEDULING_AGENT_H__
+
+#include "Logger.h"
+#include "Configure.h"
+#include "Processor.h"
+#include "ProcessContext.h"
+#include "SchedulingAgent.h"
+
+/**
+ * An abstract scheduling agent which creates and manages a pool of threads for
+ * each processor scheduled.
+ */
+class ThreadedSchedulingAgent : public SchedulingAgent
+{
+public:
+       //! Constructor
+       /*!
+        * Create a new processor
+        */
+       ThreadedSchedulingAgent()
+       : SchedulingAgent()
+       {
+       }
+       //! Destructor
+       virtual ~ThreadedSchedulingAgent()
+       {
+       }
+       
+       //! Run function for the thread
+       virtual void run(Processor *processor) = 0;
+
+public:
+       //! schedule, overwritten by different DrivenTimerDrivenSchedulingAgent
+       virtual void schedule(Processor *processor);
+       //! unschedule, overwritten by different 
DrivenTimerDrivenSchedulingAgent
+       virtual void unschedule(Processor *processor);
+
+protected:
+       //! Threads
+       std::map<std::string, std::vector<std::thread *>> _threads;
+
+private:
+       // Prevent default copy constructor and assignment operation
+       // Only support pass by reference or pointer
+       ThreadedSchedulingAgent(const ThreadedSchedulingAgent &parent);
+       ThreadedSchedulingAgent &operator=(const ThreadedSchedulingAgent 
&parent);
+
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/include/TimerDrivenSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h 
b/libminifi/include/TimerDrivenSchedulingAgent.h
index 9195745..7fd86f6 100644
--- a/libminifi/include/TimerDrivenSchedulingAgent.h
+++ b/libminifi/include/TimerDrivenSchedulingAgent.h
@@ -21,13 +21,12 @@
 #define __TIMER_DRIVEN_SCHEDULING_AGENT_H__
 
 #include "Logger.h"
-#include "Configure.h"
 #include "Processor.h"
 #include "ProcessContext.h"
-#include "SchedulingAgent.h"
+#include "ThreadedSchedulingAgent.h"
 
 //! TimerDrivenSchedulingAgent Class
-class TimerDrivenSchedulingAgent : public SchedulingAgent
+class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent
 {
 public:
        //! Constructor
@@ -35,7 +34,7 @@ public:
         * Create a new processor
         */
        TimerDrivenSchedulingAgent()
-       : SchedulingAgent()
+       : ThreadedSchedulingAgent()
        {
        }
        //! Destructor
@@ -43,19 +42,9 @@ public:
        {
        }
        //! Run function for the thread
-       static void run(TimerDrivenSchedulingAgent *agent, Processor 
*processor);
-
-public:
-       //! schedule, overwritten by different DrivenTimerDrivenSchedulingAgent
-       virtual void schedule(Processor *processor);
-       //! unschedule, overwritten by different 
DrivenTimerDrivenSchedulingAgent
-       virtual void unschedule(Processor *processor);
-
-protected:
+       void run(Processor *processor);
 
 private:
-       //! Threads
-       std::map<std::string, std::vector<std::thread *>> _threads;
        // Prevent default copy constructor and assignment operation
        // Only support pass by reference or pointer
        TimerDrivenSchedulingAgent(const TimerDrivenSchedulingAgent &parent);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/src/Connection.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp
index e036b89..7beaf7a 100644
--- a/libminifi/src/Connection.cpp
+++ b/libminifi/src/Connection.cpp
@@ -28,6 +28,7 @@
 #include <iostream>
 
 #include "Connection.h"
+#include "Processor.h"
 
 Connection::Connection(std::string name, uuid_t uuid, uuid_t srcUUID, uuid_t 
destUUID)
 : _name(name)
@@ -81,14 +82,22 @@ bool Connection::isFull()
 
 void Connection::put(FlowFileRecord *flow)
 {
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       _queue.push(flow);
-
-       _queuedDataSize += flow->getSize();
+       {
+               std::lock_guard<std::mutex> lock(_mtx);
+       
+               _queue.push(flow);
+       
+               _queuedDataSize += flow->getSize();
+       
+               _logger->log_debug("Enqueue flow file UUID %s to connection %s",
+                               flow->getUUIDStr().c_str(), _name.c_str());
+       }
 
-       _logger->log_debug("Enqueue flow file UUID %s to connection %s",
-                       flow->getUUIDStr().c_str(), _name.c_str());
+       // Notify receiving processor that work may be available
+       if(_destProcessor)
+       {
+               _destProcessor->notifyWork();
+       }
 }
 
 FlowFileRecord* Connection::poll(std::set<FlowFileRecord *> 
&expiredFlowRecords)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/src/EventDrivenSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp 
b/libminifi/src/EventDrivenSchedulingAgent.cpp
new file mode 100644
index 0000000..ed1d8ea
--- /dev/null
+++ b/libminifi/src/EventDrivenSchedulingAgent.cpp
@@ -0,0 +1,47 @@
+/**
+ * @file EventDrivenSchedulingAgent.cpp
+ * EventDrivenSchedulingAgent class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <chrono>
+#include <thread>
+#include <iostream>
+#include "Property.h"
+#include "EventDrivenSchedulingAgent.h"
+
+void EventDrivenSchedulingAgent::run(Processor *processor)
+{
+       while (this->_running)
+       {
+               bool shouldYield = this->onTrigger(processor);
+
+               if (processor->isYield())
+               {
+                       // Honor the yield
+                       
std::this_thread::sleep_for(std::chrono::milliseconds(processor->getYieldTime()));
+               }
+               else if (shouldYield && this->_boredYieldDuration > 0)
+               {
+                       // No work to do or need to apply back pressure
+                       
std::this_thread::sleep_for(std::chrono::milliseconds(this->_boredYieldDuration));
+               }
+
+               // Block until work is available
+               processor->waitForWork(1000);
+       }
+       return;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index caaa8ea..dce9e34 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -117,10 +117,13 @@ void FlowController::stop(bool force)
        {
         _logger->log_info("Stop Flow Controller");
         this->_timerScheduler.stop();
+        this->_eventScheduler.stop();
         // Wait for sometime for thread stop
         std::this_thread::sleep_for(std::chrono::milliseconds(1000));
         if (this->_root)
-            this->_root->stopProcessing(&this->_timerScheduler);
+            this->_root->stopProcessing(
+                    &this->_timerScheduler,
+                    &this->_eventScheduler);
         _running = false;
     }
 }
@@ -621,7 +624,7 @@ void FlowController::parseRemoteProcessGroupYaml(YAML::Node 
*rpgNode, ProcessGro
                         this->parsePortYaml(&currPort, group, RECEIVE);
                     } // for node
                 }
-                               
+
             }
         }
     }
@@ -1197,8 +1200,11 @@ bool FlowController::start() {
         if (!_running) {
             _logger->log_info("Start Flow Controller");
             this->_timerScheduler.start();
+            this->_eventScheduler.start();
             if (this->_root)
-                this->_root->startProcessing(&this->_timerScheduler);
+                this->_root->startProcessing(
+                        &this->_timerScheduler,
+                        &this->_eventScheduler);
             _running = true;
             this->_protocol->start();
         }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/src/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ProcessGroup.cpp b/libminifi/src/ProcessGroup.cpp
index 70ee9d7..7c98278 100644
--- a/libminifi/src/ProcessGroup.cpp
+++ b/libminifi/src/ProcessGroup.cpp
@@ -127,7 +127,8 @@ void ProcessGroup::removeProcessGroup(ProcessGroup *child)
        }
 }
 
-void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler)
+void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler,
+               EventDrivenSchedulingAgent *eventScheduler)
 {
        std::lock_guard<std::mutex> lock(_mtx);
 
@@ -141,13 +142,15 @@ void 
ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler)
                        {
                                if (processor->getSchedulingStrategy() == 
TIMER_DRIVEN)
                                        timeScheduler->schedule(processor);
+                               else if (processor->getSchedulingStrategy() == 
EVENT_DRIVEN)
+                                       eventScheduler->schedule(processor);
                        }
                }
 
                for (std::set<ProcessGroup *>::iterator it = 
_childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
                {
                        ProcessGroup *processGroup(*it);
-                       processGroup->startProcessing(timeScheduler);
+                       processGroup->startProcessing(timeScheduler, 
eventScheduler);
                }
        }
        catch (std::exception &exception)
@@ -162,7 +165,8 @@ void 
ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler)
        }
 }
 
-void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler)
+void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler,
+               EventDrivenSchedulingAgent *eventScheduler)
 {
        std::lock_guard<std::mutex> lock(_mtx);
 
@@ -174,12 +178,14 @@ void 
ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler)
                        Processor *processor(*it);
                        if (processor->getSchedulingStrategy() == TIMER_DRIVEN)
                                        timeScheduler->unschedule(processor);
+                       else if (processor->getSchedulingStrategy() == 
EVENT_DRIVEN)
+                                       eventScheduler->unschedule(processor);
                }
 
                for (std::set<ProcessGroup *>::iterator it = 
_childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
                {
                        ProcessGroup *processGroup(*it);
-                       processGroup->stopProcessing(timeScheduler);
+                       processGroup->stopProcessing(timeScheduler, 
eventScheduler);
                }
        }
        catch (std::exception &exception)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/src/Processor.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Processor.cpp b/libminifi/src/Processor.cpp
index cc136dc..8da253a 100644
--- a/libminifi/src/Processor.cpp
+++ b/libminifi/src/Processor.cpp
@@ -449,3 +449,64 @@ void Processor::onTrigger()
                throw;
        }
 }
+
+void Processor::waitForWork(uint64_t timeoutMs)
+{
+       std::unique_lock<std::mutex> lock(_workAvailableMtx);
+       _hasWork = isWorkAvailable();
+
+       if (!_hasWork)
+       {
+               _hasWorkCondition.wait_for(lock, 
std::chrono::milliseconds(timeoutMs), [&] { return _hasWork; });
+       }
+
+       lock.unlock();
+}
+
+void Processor::notifyWork()
+{
+       // Do nothing if we are not event-driven
+       if (_strategy != EVENT_DRIVEN)
+       {
+               return;
+       }
+
+       {
+               std::unique_lock<std::mutex> lock(_workAvailableMtx);
+               _hasWork = isWorkAvailable();
+
+               // Keep a scope-local copy of the state to avoid race conditions
+               bool hasWork = _hasWork;
+
+               lock.unlock();
+
+               if (hasWork)
+               {
+                       _hasWorkCondition.notify_one();
+               }
+       }
+}
+
+bool Processor::isWorkAvailable()
+{
+       // We have work if any incoming connection has work
+       bool hasWork = false;
+
+       try
+       {
+               for (const auto &conn : getIncomingConnections())
+               {
+                       if (conn->getQueueSize() > 0)
+                       {
+                               hasWork = true;
+                               break;
+                       }
+               }
+       }
+       catch (...)
+       {
+               _logger->log_error("Caught an exception while checking if work 
is available; unless it was positively determined that work is available, 
assuming NO work is available!");
+       }
+
+       return hasWork;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/src/SchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/SchedulingAgent.cpp 
b/libminifi/src/SchedulingAgent.cpp
index 211c328..a81cd55 100644
--- a/libminifi/src/SchedulingAgent.cpp
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -82,5 +82,4 @@ bool SchedulingAgent::onTrigger(Processor *processor)
        }
 
        return false;
-}
-
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/src/ThreadedSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp 
b/libminifi/src/ThreadedSchedulingAgent.cpp
new file mode 100644
index 0000000..0338019
--- /dev/null
+++ b/libminifi/src/ThreadedSchedulingAgent.cpp
@@ -0,0 +1,113 @@
+/**
+ * @file ThreadedSchedulingAgent.cpp
+ * ThreadedSchedulingAgent class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <thread>
+#include <iostream>
+
+#include "ThreadedSchedulingAgent.h"
+
+void ThreadedSchedulingAgent::schedule(Processor *processor)
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       _administrativeYieldDuration = 0;
+       std::string yieldValue;
+
+       if (_configure->get(Configure::nifi_administrative_yield_duration, 
yieldValue))
+       {
+               TimeUnit unit;
+               if (Property::StringToTime(yieldValue, 
_administrativeYieldDuration, unit) &&
+                                       
Property::ConvertTimeUnitToMS(_administrativeYieldDuration, unit, 
_administrativeYieldDuration))
+               {
+                       _logger->log_debug("nifi_administrative_yield_duration: 
[%d] ms", _administrativeYieldDuration);
+               }
+       }
+
+       _boredYieldDuration = 0;
+       if (_configure->get(Configure::nifi_bored_yield_duration, yieldValue))
+       {
+               TimeUnit unit;
+               if (Property::StringToTime(yieldValue, _boredYieldDuration, 
unit) &&
+                                       
Property::ConvertTimeUnitToMS(_boredYieldDuration, unit, _boredYieldDuration))
+               {
+                       _logger->log_debug("nifi_bored_yield_duration: [%d] 
ms", _boredYieldDuration);
+               }
+       }
+
+       if (processor->getScheduledState() != RUNNING)
+       {
+               _logger->log_info("Can not schedule threads for processor %s 
because it is not running", processor->getName().c_str());
+               return;
+       }
+
+       std::map<std::string, std::vector<std::thread *>>::iterator it =
+                       _threads.find(processor->getUUIDStr());
+       if (it != _threads.end())
+       {
+               _logger->log_info("Can not schedule threads for processor %s 
because there are existing threads running");
+               return;
+       }
+
+       std::vector<std::thread *> threads;
+       for (int i = 0; i < processor->getMaxConcurrentTasks(); i++)
+       {
+           ThreadedSchedulingAgent *agent = this;
+               std::thread *thread = new std::thread([agent, processor] () { 
agent->run(processor); });
+               thread->detach();
+               threads.push_back(thread);
+               _logger->log_info("Scheduled thread %d running for process %s", 
thread->get_id(),
+                               processor->getName().c_str());
+       }
+       _threads[processor->getUUIDStr().c_str()] = threads;
+
+       return;
+}
+
+void ThreadedSchedulingAgent::unschedule(Processor *processor)
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+       
+       _logger->log_info("Shutting down threads for processor %s/%s",
+                       processor->getName().c_str(),
+                       processor->getUUIDStr().c_str());
+
+       if (processor->getScheduledState() != RUNNING)
+       {
+               _logger->log_info("Cannot unschedule threads for processor %s 
because it is not running", processor->getName().c_str());
+               return;
+       }
+
+       std::map<std::string, std::vector<std::thread *>>::iterator it =
+                       _threads.find(processor->getUUIDStr());
+
+       if (it == _threads.end())
+       {
+               _logger->log_info("Cannot unschedule threads for processor %s 
because there are no existing threads running", processor->getName().c_str());
+               return;
+       }
+       for (std::vector<std::thread *>::iterator itThread = 
it->second.begin(); itThread != it->second.end(); ++itThread)
+       {
+               std::thread *thread = *itThread;
+               _logger->log_info("Scheduled thread %d deleted for process %s", 
thread->get_id(),
+                               processor->getName().c_str());
+               delete thread;
+       }
+       _threads.erase(processor->getUUIDStr());
+       processor->clearActiveTask();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/src/TimerDrivenSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp 
b/libminifi/src/TimerDrivenSchedulingAgent.cpp
index 3ce57ae..09b7ed6 100644
--- a/libminifi/src/TimerDrivenSchedulingAgent.cpp
+++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp
@@ -23,112 +23,23 @@
 #include "Property.h"
 #include "TimerDrivenSchedulingAgent.h"
 
-void TimerDrivenSchedulingAgent::schedule(Processor *processor)
+void TimerDrivenSchedulingAgent::run(Processor *processor)
 {
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       _administrativeYieldDuration = 0;
-       std::string yieldValue;
-
-       if (_configure->get(Configure::nifi_administrative_yield_duration, 
yieldValue))
+       while (this->_running)
        {
-               TimeUnit unit;
-               if (Property::StringToTime(yieldValue, 
_administrativeYieldDuration, unit) &&
-                                       
Property::ConvertTimeUnitToMS(_administrativeYieldDuration, unit, 
_administrativeYieldDuration))
-               {
-                       _logger->log_debug("nifi_administrative_yield_duration: 
[%d] ms", _administrativeYieldDuration);
-               }
-       }
-
-       _boredYieldDuration = 0;
-       if (_configure->get(Configure::nifi_bored_yield_duration, yieldValue))
-       {
-               TimeUnit unit;
-               if (Property::StringToTime(yieldValue, _boredYieldDuration, 
unit) &&
-                                       
Property::ConvertTimeUnitToMS(_boredYieldDuration, unit, _boredYieldDuration))
-               {
-                       _logger->log_debug("nifi_bored_yield_duration: [%d] 
ms", _boredYieldDuration);
-               }
-       }
-
-       if (processor->getScheduledState() != RUNNING)
-       {
-               _logger->log_info("Can not schedule threads for processor %s 
because it is not running", processor->getName().c_str());
-               return;
-       }
-
-       std::map<std::string, std::vector<std::thread *>>::iterator it =
-                       _threads.find(processor->getUUIDStr());
-       if (it != _threads.end())
-       {
-               _logger->log_info("Can not schedule threads for processor %s 
because there are existed thread running");
-               return;
-       }
-
-       std::vector<std::thread *> threads;
-       for (int i = 0; i < processor->getMaxConcurrentTasks(); i++)
-       {
-               std::thread *thread = new std::thread(run, this, processor);
-               thread->detach();
-               threads.push_back(thread);
-               _logger->log_info("Scheduled Time Driven thread %d running for 
process %s", thread->get_id(),
-                               processor->getName().c_str());
-       }
-       _threads[processor->getUUIDStr().c_str()] = threads;
-
-       return;
-}
-
-void TimerDrivenSchedulingAgent::unschedule(Processor *processor)
-{
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       if (processor->getScheduledState() != RUNNING)
-       {
-               _logger->log_info("Can not unschedule threads for processor %s 
because it is not running", processor->getName().c_str());
-               return;
-       }
-
-       std::map<std::string, std::vector<std::thread *>>::iterator it =
-                       _threads.find(processor->getUUIDStr());
-
-       if (it == _threads.end())
-       {
-               _logger->log_info("Can not unschedule threads for processor %s 
because there are no existed thread running");
-               return;
-       }
-       for (std::vector<std::thread *>::iterator itThread = 
it->second.begin(); itThread != it->second.end(); ++itThread)
-       {
-               std::thread *thread = *itThread;
-               _logger->log_info("Scheduled Time Driven thread %d deleted for 
process %s", thread->get_id(),
-                               processor->getName().c_str());
-               delete thread;
-       }
-       _threads.erase(processor->getUUIDStr());
-       processor->clearActiveTask();
-
-       return;
-}
-
-void TimerDrivenSchedulingAgent::run(TimerDrivenSchedulingAgent *agent, 
Processor *processor)
-{
-       while (agent->_running)
-       {
-               bool shouldYield = agent->onTrigger(processor);
+               bool shouldYield = this->onTrigger(processor);
 
                if (processor->isYield())
                {
                        // Honor the yield
                        
std::this_thread::sleep_for(std::chrono::milliseconds(processor->getYieldTime()));
                }
-               else if (shouldYield && agent->_boredYieldDuration > 0)
+               else if (shouldYield && this->_boredYieldDuration > 0)
                {
                        // No work to do or need to apply back pressure
-                       
std::this_thread::sleep_for(std::chrono::milliseconds(agent->_boredYieldDuration));
+                       
std::this_thread::sleep_for(std::chrono::milliseconds(this->_boredYieldDuration));
                }
                
std::this_thread::sleep_for(std::chrono::nanoseconds(processor->getSchedulingPeriodNano()));
        }
        return;
 }
-
-

Reply via email to