http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/RealTimeDataCollector.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/RealTimeDataCollector.cpp b/libminifi/src/RealTimeDataCollector.cpp deleted file mode 100644 index 7dd6469..0000000 --- a/libminifi/src/RealTimeDataCollector.cpp +++ /dev/null @@ -1,481 +0,0 @@ -/** - * @file RealTimeDataCollector.cpp - * RealTimeDataCollector class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <vector> -#include <queue> -#include <map> -#include <set> -#include <sys/time.h> -#include <time.h> -#include <chrono> -#include <thread> -#include <random> -#include <netinet/tcp.h> - -#include "utils/StringUtils.h" -#include "RealTimeDataCollector.h" -#include "ProcessContext.h" -#include "ProcessSession.h" - -const std::string RealTimeDataCollector::ProcessorName("RealTimeDataCollector"); -Property RealTimeDataCollector::FILENAME("File Name", "File Name for the real time processor to process", "data.osp"); -Property RealTimeDataCollector::REALTIMESERVERNAME("Real Time Server Name", "Real Time Server Name", "localhost"); -Property RealTimeDataCollector::REALTIMESERVERPORT("Real Time Server Port", "Real Time Server Port", "10000"); -Property RealTimeDataCollector::BATCHSERVERNAME("Batch Server Name", "Batch Server Name", "localhost"); -Property RealTimeDataCollector::BATCHSERVERPORT("Batch Server Port", "Batch Server Port", "10001"); -Property RealTimeDataCollector::ITERATION("Iteration", - "If true, sample osp file will be iterated", "true"); -Property RealTimeDataCollector::REALTIMEMSGID("Real Time Message ID", "Real Time Message ID", "41"); -Property RealTimeDataCollector::BATCHMSGID("Batch Message ID", "Batch Message ID", "172, 30, 48"); -Property RealTimeDataCollector::REALTIMEINTERVAL("Real Time Interval", "Real Time Data Collection Interval in msec", "10 ms"); -Property RealTimeDataCollector::BATCHINTERVAL("Batch Time Interval", "Batch Processing Interval in msec", "100 ms"); -Property RealTimeDataCollector::BATCHMAXBUFFERSIZE("Batch Max Buffer Size", "Batch Buffer Maximum size in bytes", "262144"); -Relationship RealTimeDataCollector::Success("success", "success operational on the flow record"); - -void RealTimeDataCollector::initialize() -{ - //! Set the supported properties - std::set<Property> properties; - properties.insert(FILENAME); - properties.insert(REALTIMESERVERNAME); - properties.insert(REALTIMESERVERPORT); - properties.insert(BATCHSERVERNAME); - properties.insert(BATCHSERVERPORT); - properties.insert(ITERATION); - properties.insert(REALTIMEMSGID); - properties.insert(BATCHMSGID); - properties.insert(REALTIMEINTERVAL); - properties.insert(BATCHINTERVAL); - properties.insert(BATCHMAXBUFFERSIZE); - - setSupportedProperties(properties); - //! Set the supported relationships - std::set<Relationship> relationships; - relationships.insert(Success); - setSupportedRelationships(relationships); - -} - -int RealTimeDataCollector::connectServer(const char *host, uint16_t port) -{ - in_addr_t addr; - int sock = 0; - struct hostent *h; -#ifdef __MACH__ - h = gethostbyname(host); -#else - char buf[1024]; - struct hostent he; - int hh_errno; - gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno); -#endif - memcpy((char *) &addr, h->h_addr_list[0], h->h_length); - sock = socket(AF_INET, SOCK_STREAM, 0); - if (sock < 0) - { - logger_->log_error("Could not create socket to hostName %s", host); - return 0; - } - -#ifndef __MACH__ - int opt = 1; - bool nagle_off = true; - - if (nagle_off) - { - if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, sizeof(opt)) < 0) - { - logger_->log_error("setsockopt() TCP_NODELAY failed"); - close(sock); - return 0; - } - if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, - (char *)&opt, sizeof(opt)) < 0) - { - logger_->log_error("setsockopt() SO_REUSEADDR failed"); - close(sock); - return 0; - } - } - - int sndsize = 256*1024; - if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, (int)sizeof(sndsize)) < 0) - { - logger_->log_error("setsockopt() SO_SNDBUF failed"); - close(sock); - return 0; - } -#endif - - struct sockaddr_in sa; - socklen_t socklen; - int status; - - //TODO bind socket to the interface - memset(&sa, 0, sizeof(sa)); - sa.sin_family = AF_INET; - sa.sin_addr.s_addr = htonl(INADDR_ANY); - sa.sin_port = htons(0); - socklen = sizeof(sa); - if (bind(sock, (struct sockaddr *)&sa, socklen) < 0) - { - logger_->log_error("socket bind failed"); - close(sock); - return 0; - } - - memset(&sa, 0, sizeof(sa)); - sa.sin_family = AF_INET; - sa.sin_addr.s_addr = addr; - sa.sin_port = htons(port); - socklen = sizeof(sa); - - status = connect(sock, (struct sockaddr *)&sa, socklen); - - if (status < 0) - { - logger_->log_error("socket connect failed to %s %d", host, port); - close(sock); - return 0; - } - - logger_->log_info("socket %d connect to server %s port %d success", sock, host, port); - - return sock; -} - -int RealTimeDataCollector::sendData(int socket, const char *buf, int buflen) -{ - int ret = 0, bytes = 0; - - while (bytes < buflen) - { - ret = send(socket, buf+bytes, buflen-bytes, 0); - //check for errors - if (ret == -1) - { - return ret; - } - bytes+=ret; - } - - if (ret) - logger_->log_debug("Send data size %d over socket %d", buflen, socket); - - return ret; -} - -void RealTimeDataCollector::onTriggerRealTime(ProcessContext *context, ProcessSession *session) -{ - if (_realTimeAccumulated >= this->_realTimeInterval) - { - std::string value; - if (this->getProperty(REALTIMEMSGID.getName(), value)) - { - this->_realTimeMsgID.clear(); - this->logger_->log_info("Real Time Msg IDs %s", value.c_str()); - std::stringstream lineStream(value); - std::string cell; - - while(std::getline(lineStream, cell, ',')) - { - this->_realTimeMsgID.push_back(cell); - // this->logger_->log_debug("Real Time Msg ID %s", cell.c_str()); - } - } - if (this->getProperty(BATCHMSGID.getName(), value)) - { - this->_batchMsgID.clear(); - this->logger_->log_info("Batch Msg IDs %s", value.c_str()); - std::stringstream lineStream(value); - std::string cell; - - while(std::getline(lineStream, cell, ',')) - { - cell = StringUtils::trim(cell); - this->_batchMsgID.push_back(cell); - } - } - // Open the file - if (!this->_fileStream.is_open()) - { - _fileStream.open(this->_fileName.c_str(), std::ifstream::in); - if (this->_fileStream.is_open()) - logger_->log_debug("open %s", _fileName.c_str()); - } - if (!_fileStream.good()) - { - logger_->log_error("load data file failed %s", _fileName.c_str()); - return; - } - if (this->_fileStream.is_open()) - { - std::string line; - - while (std::getline(_fileStream, line)) - { - line += "\n"; - std::stringstream lineStream(line); - std::string cell; - if (std::getline(lineStream, cell, ',')) - { - cell = StringUtils::trim(cell); - // Check whether it match to the batch traffic - for (std::vector<std::string>::iterator it = _batchMsgID.begin(); it != _batchMsgID.end(); ++it) - { - if (cell == *it) - { - // push the batch data to the queue - std::lock_guard<std::mutex> lock(_mtx); - while ((_queuedDataSize + line.size()) > _batchMaxBufferSize) - { - std::string item = _queue.front(); - _queuedDataSize -= item.size(); - logger_->log_debug("Pop item size %d from batch queue, queue buffer size %d", item.size(), _queuedDataSize); - _queue.pop(); - } - _queue.push(line); - _queuedDataSize += line.size(); - logger_->log_debug("Push batch msg ID %s into batch queue, queue buffer size %d", cell.c_str(), _queuedDataSize); - } - } - bool findRealTime = false; - // Check whether it match to the real time traffic - for (std::vector<std::string>::iterator it = _realTimeMsgID.begin(); it != _realTimeMsgID.end(); ++it) - { - if (cell == *it) - { - int status = 0; - if (this->_realTimeSocket <= 0) - { - // Connect the LTE socket - uint16_t port = _realTimeServerPort; - this->_realTimeSocket = connectServer(_realTimeServerName.c_str(), port); - } - if (this->_realTimeSocket) - { - // try to send the data - status = sendData(_realTimeSocket, line.data(), line.size()); - if (status < 0) - { - close(_realTimeSocket); - _realTimeSocket = 0; - } - } - if (this->_realTimeSocket <= 0 || status < 0) - { - // push the batch data to the queue - std::lock_guard<std::mutex> lock(_mtx); - while ((_queuedDataSize + line.size()) > _batchMaxBufferSize) - { - std::string item = _queue.front(); - _queuedDataSize -= item.size(); - logger_->log_debug("Pop item size %d from batch queue, queue buffer size %d", item.size(), _queuedDataSize); - _queue.pop(); - } - _queue.push(line); - _queuedDataSize += line.size(); - logger_->log_debug("Push real time msg ID %s into batch queue, queue buffer size %d", cell.c_str(), _queuedDataSize); - } - // find real time - findRealTime = true; - } // cell - } // for real time pattern - if (findRealTime) - // we break the while once we find the first real time - break; - } // if get line - } // while - if (_fileStream.eof()) - { - _fileStream.close(); - } - } // if open - _realTimeAccumulated = 0; - } - _realTimeAccumulated += context->getProcessor()->getSchedulingPeriodNano(); -} - -void RealTimeDataCollector::onTriggerBatch(ProcessContext *context, ProcessSession *session) -{ - if (_batchAcccumulated >= this->_batchInterval) - { - // logger_->log_info("onTriggerBatch"); - // dequeue the batch and send over WIFI - int status = 0; - if (this->_batchSocket <= 0) - { - // Connect the WIFI socket - uint16_t port = _batchServerPort; - this->_batchSocket = connectServer(_batchServerName.c_str(), port); - } - if (this->_batchSocket) - { - std::lock_guard<std::mutex> lock(_mtx); - - while (!_queue.empty()) - { - std::string line = _queue.front(); - status = sendData(_batchSocket, line.data(), line.size()); - _queue.pop(); - _queuedDataSize -= line.size(); - if (status < 0) - { - close(_batchSocket); - _batchSocket = 0; - break; - } - } - } - _batchAcccumulated = 0; - } - _batchAcccumulated += context->getProcessor()->getSchedulingPeriodNano(); -} - -void RealTimeDataCollector::onTrigger(ProcessContext *context, ProcessSession *session) -{ - std::thread::id id = std::this_thread::get_id(); - - if (id == _realTimeThreadId) - return onTriggerRealTime(context, session); - else if (id == _batchThreadId) - return onTriggerBatch(context, session); - else - { - std::lock_guard<std::mutex> lock(_mtx); - if (!this->_firstInvoking) - { - this->_fileName = "data.osp"; - std::string value; - if (this->getProperty(FILENAME.getName(), value)) - { - this->_fileName = value; - this->logger_->log_info("Data Collector File Name %s", _fileName.c_str()); - } - this->_realTimeServerName = "localhost"; - if (this->getProperty(REALTIMESERVERNAME.getName(), value)) - { - this->_realTimeServerName = value; - this->logger_->log_info("Real Time Server Name %s", this->_realTimeServerName.c_str()); - } - this->_realTimeServerPort = 10000; - if (this->getProperty(REALTIMESERVERPORT.getName(), value)) - { - Property::StringToInt(value, _realTimeServerPort); - this->logger_->log_info("Real Time Server Port %d", _realTimeServerPort); - } - if (this->getProperty(BATCHSERVERNAME.getName(), value)) - { - this->_batchServerName = value; - this->logger_->log_info("Batch Server Name %s", this->_batchServerName.c_str()); - } - this->_batchServerPort = 10001; - if (this->getProperty(BATCHSERVERPORT.getName(), value)) - { - Property::StringToInt(value, _batchServerPort); - this->logger_->log_info("Batch Server Port %d", _batchServerPort); - } - if (this->getProperty(ITERATION.getName(), value)) - { - StringUtils::StringToBool(value, this->_iteration); - logger_->log_info("Iteration %d", _iteration); - } - this->_realTimeInterval = 10000000; //10 msec - if (this->getProperty(REALTIMEINTERVAL.getName(), value)) - { - TimeUnit unit; - if (Property::StringToTime(value, _realTimeInterval, unit) && - Property::ConvertTimeUnitToNS(_realTimeInterval, unit, _realTimeInterval)) - { - logger_->log_info("Real Time Interval: [%d] ns", _realTimeInterval); - } - } - this->_batchInterval = 100000000; //100 msec - if (this->getProperty(BATCHINTERVAL.getName(), value)) - { - TimeUnit unit; - if (Property::StringToTime(value, _batchInterval, unit) && - Property::ConvertTimeUnitToNS(_batchInterval, unit, _batchInterval)) - { - logger_->log_info("Batch Time Interval: [%d] ns", _batchInterval); - } - } - this->_batchMaxBufferSize = 256*1024; - if (this->getProperty(BATCHMAXBUFFERSIZE.getName(), value)) - { - Property::StringToInt(value, _batchMaxBufferSize); - this->logger_->log_info("Batch Max Buffer Size %d", _batchMaxBufferSize); - } - if (this->getProperty(REALTIMEMSGID.getName(), value)) - { - this->logger_->log_info("Real Time Msg IDs %s", value.c_str()); - std::stringstream lineStream(value); - std::string cell; - - while(std::getline(lineStream, cell, ',')) - { - this->_realTimeMsgID.push_back(cell); - this->logger_->log_info("Real Time Msg ID %s", cell.c_str()); - } - } - if (this->getProperty(BATCHMSGID.getName(), value)) - { - this->logger_->log_info("Batch Msg IDs %s", value.c_str()); - std::stringstream lineStream(value); - std::string cell; - - while(std::getline(lineStream, cell, ',')) - { - cell = StringUtils::trim(cell); - this->_batchMsgID.push_back(cell); - this->logger_->log_info("Batch Msg ID %s", cell.c_str()); - } - } - // Connect the LTE socket - uint16_t port = _realTimeServerPort; - - this->_realTimeSocket = connectServer(_realTimeServerName.c_str(), port); - - // Connect the WIFI socket - port = _batchServerPort; - - this->_batchSocket = connectServer(_batchServerName.c_str(), port); - - // Open the file - _fileStream.open(this->_fileName.c_str(), std::ifstream::in); - if (!_fileStream.good()) - { - logger_->log_error("load data file failed %s", _fileName.c_str()); - return; - } - else - { - logger_->log_debug("open %s", _fileName.c_str()); - } - _realTimeThreadId = id; - this->_firstInvoking = true; - } - else - { - if (id != _realTimeThreadId) - _batchThreadId = id; - this->_firstInvoking = false; - } - } -}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/RemoteProcessorGroupPort.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp index dd1d035..9790256 100644 --- a/libminifi/src/RemoteProcessorGroupPort.cpp +++ b/libminifi/src/RemoteProcessorGroupPort.cpp @@ -29,93 +29,116 @@ #include "RemoteProcessorGroupPort.h" +#include "../include/io/StreamFactory.h" #include "io/ClientSocket.h" -#include "io/SocketFactory.h" - #include "utils/TimeUtil.h" -#include "ProcessContext.h" -#include "ProcessSession.h" - -const std::string RemoteProcessorGroupPort::ProcessorName("RemoteProcessorGroupPort"); -Property RemoteProcessorGroupPort::hostName("Host Name", "Remote Host Name.", "localhost"); -Property RemoteProcessorGroupPort::port("Port", "Remote Port", "9999"); -Relationship RemoteProcessorGroupPort::relation; - -void RemoteProcessorGroupPort::initialize() -{ - - //! Set the supported properties - std::set<Property> properties; - properties.insert(hostName); - properties.insert(port); - setSupportedProperties(properties); - //! Set the supported relationships - std::set<Relationship> relationships; - relationships.insert(relation); - setSupportedRelationships(relationships); +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { + +const std::string RemoteProcessorGroupPort::ProcessorName( + "RemoteProcessorGroupPort"); +core::Property RemoteProcessorGroupPort::hostName("Host Name", + "Remote Host Name.", + "localhost"); +core::Property RemoteProcessorGroupPort::port("Port", "Remote Port", "9999"); +core::Relationship RemoteProcessorGroupPort::relation; + + +std::unique_ptr<Site2SiteClientProtocol> RemoteProcessorGroupPort::getNextProtocol() { + std::lock_guard<std::mutex> protocol_lock_(protocol_mutex_); + if (available_protocols_.empty()) + return nullptr; + + std::unique_ptr<Site2SiteClientProtocol> return_pointer = std::move(available_protocols_.top()); + available_protocols_.pop(); + return std::move(return_pointer); +} + +void RemoteProcessorGroupPort::returnProtocol( + std::unique_ptr<Site2SiteClientProtocol> return_protocol) { + std::lock_guard<std::mutex> protocol_lock_(protocol_mutex_); + available_protocols_.push(std::move(return_protocol)); +} + +void RemoteProcessorGroupPort::initialize() { + + // Set the supported properties + std::set<core::Property> properties; + properties.insert(hostName); + properties.insert(port); + setSupportedProperties(properties); + // Set the supported relationships + std::set<core::Relationship> relationships; + relationships.insert(relation); + setSupportedRelationships(relationships); } -void RemoteProcessorGroupPort::onTrigger(ProcessContext *context, ProcessSession *session) -{ - std::string value; - - if (!transmitting_) - return; - - std::string host = peer_.getHostName(); - uint16_t sport = peer_.getPort(); - int64_t lvalue; - - if (context->getProperty(hostName.getName(), value)) - { - host = value; - } - if (context->getProperty(port.getName(), value) && Property::StringToInt(value, lvalue)) - { - sport = (uint16_t) lvalue; - } - - if (host != peer_.getHostName() || sport != peer_.getPort()) - - { - - std::unique_ptr<DataStream> str = std::unique_ptr<DataStream>(SocketFactory::getInstance()->createSocket(host,sport)); - peer_ = std::move(Site2SitePeer (std::move(str), host, sport)); - protocol_->setPeer(&peer_); - - } - - - - bool needReset = false; - - - if (host != peer_.getHostName()) - { - peer_.setHostName(host); - needReset= true; - } - if (sport != peer_.getPort()) - { - peer_.setPort(sport); - needReset = true; - } - if (needReset) - protocol_->tearDown(); - - if (!protocol_->bootstrap()) - { - // bootstrap the client protocol if needeed - context->yield(); - logger_->log_error("Site2Site bootstrap failed yield period %d peer ", context->getProcessor()->getYieldPeriodMsec()); - return; - } - - if (direction_ == RECEIVE) - protocol_->receiveFlowFiles(context, session); - else - protocol_->transferFlowFiles(context, session); - - return; +void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context, + core::ProcessSession *session) { + std::string value; + + if (!transmitting_) + return; + + std::unique_ptr<Site2SiteClientProtocol> protocol_ = getNextProtocol(); + + // Peer Connection + if (protocol_ == nullptr) { + + protocol_ = std::unique_ptr<Site2SiteClientProtocol>( + new Site2SiteClientProtocol(0)); + protocol_->setPortId(protocol_uuid_); + protocol_->setTimeOut(timeout_); + + std::string host = ""; + uint16_t sport = 0; + int64_t lvalue; + + if (context->getProperty(hostName.getName(), value)) { + host = value; + } + if (context->getProperty(port.getName(), value) + && core::Property::StringToInt(value, lvalue)) { + sport = (uint16_t) lvalue; + } + std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str = + std::unique_ptr<org::apache::nifi::minifi::io::DataStream>( + org::apache::nifi::minifi::io::StreamFactory::getInstance() + ->createSocket(host, sport)); + + std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr<Site2SitePeer>( + new Site2SitePeer(std::move(str), host, sport)); + + protocol_->setPeer(std::move(peer_)); + } + + if (!protocol_->bootstrap()) { + // bootstrap the client protocol if needeed + context->yield(); + std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>( + context->getProcessorNode().getProcessor()); + logger_->log_error("Site2Site bootstrap failed yield period %d peer ", + processor->getYieldPeriodMsec()); + return; + } + + if (direction_ == RECEIVE) + protocol_->receiveFlowFiles(context, session); + else + protocol_->transferFlowFiles(context, session); + + returnProtocol(std::move(protocol_)); + + return; } + +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/Repository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Repository.cpp b/libminifi/src/Repository.cpp deleted file mode 100644 index 1a3c7b0..0000000 --- a/libminifi/src/Repository.cpp +++ /dev/null @@ -1,140 +0,0 @@ -/** - * @file Repository.cpp - * Repository implemenatation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <cstdint> -#include <vector> -#include <arpa/inet.h> -#include "io/DataStream.h" -#include "io/Serializable.h" -#include "Relationship.h" -#include "Logger.h" -#include "FlowController.h" -#include "Repository.h" -#include "Provenance.h" -#include "FlowFileRepository.h" - -const char *Repository::RepositoryTypeStr[MAX_REPO_TYPE] = {"Provenace Repository", "FlowFile Repository"}; -uint64_t Repository::_repoSize[MAX_REPO_TYPE] = {0, 0}; - -void Repository::start() { - if (!_enable) - return; - if (this->_purgePeriod <= 0) - return; - if (_running) - return; - _running = true; - logger_->log_info("%s Repository Monitor Thread Start", RepositoryTypeStr[_type]); - _thread = new std::thread(run, this); - _thread->detach(); -} - -void Repository::stop() { - if (!_running) - return; - _running = false; - logger_->log_info("%s Repository Monitor Thread Stop", RepositoryTypeStr[_type]); -} - -void Repository::run(Repository *repo) { -#ifdef LEVELDB_SUPPORT - // threshold for purge - uint64_t purgeThreshold = repo->_maxPartitionBytes * 3 / 4; - while (repo->_running) { - std::this_thread::sleep_for( - std::chrono::milliseconds(repo->_purgePeriod)); - uint64_t curTime = getTimeMillis(); - uint64_t size = repo->repoSize(); - if (size >= purgeThreshold) { - std::vector<std::string> purgeList; - leveldb::Iterator* it = repo->_db->NewIterator( - leveldb::ReadOptions()); - if (repo->_type == PROVENANCE) - { - for (it->SeekToFirst(); it->Valid(); it->Next()) { - ProvenanceEventRecord eventRead; - std::string key = it->key().ToString(); - if (eventRead.DeSerialize((uint8_t *) it->value().data(), - (int) it->value().size())) { - if ((curTime - eventRead.getEventTime()) - > repo->_maxPartitionMillis) - purgeList.push_back(key); - } else { - repo->logger_->log_debug( - "NiFi %s retrieve event %s fail", - RepositoryTypeStr[repo->_type], - key.c_str()); - purgeList.push_back(key); - } - } - } - if (repo->_type == FLOWFILE) - { - for (it->SeekToFirst(); it->Valid(); it->Next()) { - FlowFileEventRecord eventRead; - std::string key = it->key().ToString(); - if (eventRead.DeSerialize((uint8_t *) it->value().data(), - (int) it->value().size())) { - if ((curTime - eventRead.getEventTime()) - > repo->_maxPartitionMillis) - purgeList.push_back(key); - } else { - repo->logger_->log_debug( - "NiFi %s retrieve event %s fail", - RepositoryTypeStr[repo->_type], - key.c_str()); - purgeList.push_back(key); - } - } - } - delete it; - for (auto eventId : purgeList) - { - repo->logger_->log_info("Repository Repo %s Purge %s", - RepositoryTypeStr[repo->_type], - eventId.c_str()); - repo->Delete(eventId); - } - } - if (size > repo->_maxPartitionBytes) - repo->_repoFull = true; - else - repo->_repoFull = false; - } -#endif - return; -} - -//! repoSize -uint64_t Repository::repoSize() -{ - _repoSize[_type] = 0; - if (_type == PROVENANCE) - { - if (ftw(_directory.c_str(), repoSumProvenance, 1) != 0) - _repoSize[_type] = 0; - } - if (_type == FLOWFILE) - { - if (ftw(_directory.c_str(), repoSumFlowFile, 1) != 0) - _repoSize[_type] = 0; - } - return _repoSize[_type]; -} - http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/ResourceClaim.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/ResourceClaim.cpp b/libminifi/src/ResourceClaim.cpp index be52b49..826ca1d 100644 --- a/libminifi/src/ResourceClaim.cpp +++ b/libminifi/src/ResourceClaim.cpp @@ -23,27 +23,36 @@ #include "ResourceClaim.h" -std::atomic<uint64_t> ResourceClaim::_localResourceClaimNumber(0); +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +std::atomic<uint64_t> ResourceClaim::_localResourceClaimNumber(0); -std::string ResourceClaim::default_directory_path=DEFAULT_CONTENT_DIRECTORY; +std::string ResourceClaim::default_directory_path = DEFAULT_CONTENT_DIRECTORY; ResourceClaim::ResourceClaim(const std::string contentDirectory) -: _id(_localResourceClaimNumber.load()), - _flowFileRecordOwnedCount(0) -{ - - char uuidStr[37]; - - // Generate the global UUID for the resource claim - uuid_generate(_uuid); - // Increase the local ID for the resource claim - ++_localResourceClaimNumber; - uuid_unparse_lower(_uuid, uuidStr); - // Create the full content path for the content - _contentFullPath = contentDirectory + "/" + uuidStr; - - configure_ = Configure::getConfigure(); - logger_ = Logger::getLogger(); - logger_->log_debug("Resource Claim created %s", uuidStr); + : _id(_localResourceClaimNumber.load()), + _flowFileRecordOwnedCount(0) { + + char uuidStr[37]; + + // Generate the global UUID for the resource claim + uuid_generate(_uuid); + // Increase the local ID for the resource claim + ++_localResourceClaimNumber; + uuid_unparse_lower(_uuid, uuidStr); + // Create the full content path for the content + _contentFullPath = contentDirectory + "/" + uuidStr; + + configure_ = Configure::getConfigure(); + + logger_ = logging::Logger::getLogger(); + logger_->log_debug("Resource Claim created %s", _contentFullPath.c_str()); } + +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/SchedulingAgent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp index 984abdc..8cb88e0 100644 --- a/libminifi/src/SchedulingAgent.cpp +++ b/libminifi/src/SchedulingAgent.cpp @@ -22,64 +22,71 @@ #include <iostream> #include "Exception.h" #include "SchedulingAgent.h" +#include "core/Processor.h" -bool SchedulingAgent::hasWorkToDo(Processor *processor) -{ - // Whether it has work to do - if (processor->getTriggerWhenEmpty() || !processor->hasIncomingConnections() || - processor->flowFilesQueued()) - return true; - else - return false; +namespace org { +namespace apache { +namespace nifi { +namespace minifi { + +bool SchedulingAgent::hasWorkToDo( + std::shared_ptr<core::Processor> processor) { + // Whether it has work to do + if (processor->getTriggerWhenEmpty() || !processor->hasIncomingConnections() + || processor->flowFilesQueued()) + return true; + else + return false; } -bool SchedulingAgent::hasTooMuchOutGoing(Processor *processor) -{ - return processor->flowFilesOutGoingFull(); +bool SchedulingAgent::hasTooMuchOutGoing( + std::shared_ptr<core::Processor> processor) { + return processor->flowFilesOutGoingFull(); } -bool SchedulingAgent::onTrigger(Processor *processor, ProcessContext *processContext, ProcessSessionFactory *sessionFactory) -{ - if (processor->isYield()) - return false; +bool SchedulingAgent::onTrigger( + std::shared_ptr<core::Processor> processor, + core::ProcessContext *processContext, + core::ProcessSessionFactory *sessionFactory) { + if (processor->isYield()) + return false; + + // No need to yield, reset yield expiration to 0 + processor->clearYield(); - // No need to yield, reset yield expiration to 0 - processor->clearYield(); + if (!hasWorkToDo(processor)) + // No work to do, yield + return true; - if (!hasWorkToDo(processor)) - // No work to do, yield - return true; + if (hasTooMuchOutGoing(processor)) + // need to apply backpressure + return true; - if(hasTooMuchOutGoing(processor)) - // need to apply backpressure - return true; + //TODO runDuration - //TODO runDuration + processor->incrementActiveTasks(); + try { + processor->onTrigger(processContext, sessionFactory); + processor->decrementActiveTask(); + } catch (Exception &exception) { + // Normal exception + logger_->log_debug("Caught Exception %s", exception.what()); + processor->decrementActiveTask(); + } catch (std::exception &exception) { + logger_->log_debug("Caught Exception %s", exception.what()); + processor->yield(_administrativeYieldDuration); + processor->decrementActiveTask(); + } catch (...) { + logger_->log_debug("Caught Exception during SchedulingAgent::onTrigger"); + processor->yield(_administrativeYieldDuration); + processor->decrementActiveTask(); + } + + return false; +} - processor->incrementActiveTasks(); - try - { - processor->onTrigger(processContext, sessionFactory); - processor->decrementActiveTask(); - } - catch (Exception &exception) - { - // Normal exception - logger_->log_debug("Caught Exception %s", exception.what()); - processor->decrementActiveTask(); - } - catch (std::exception &exception) - { - logger_->log_debug("Caught Exception %s", exception.what()); - processor->yield(_administrativeYieldDuration); - processor->decrementActiveTask(); - } - catch (...) - { - logger_->log_debug("Caught Exception during SchedulingAgent::onTrigger"); - processor->yield(_administrativeYieldDuration); - processor->decrementActiveTask(); - } - return false; -} \ No newline at end of file +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */
