http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/ProcessGroup.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp index c33800d..9e6778c 100644 --- a/libminifi/src/core/ProcessGroup.cpp +++ b/libminifi/src/core/ProcessGroup.cpp @@ -37,8 +37,7 @@ namespace nifi { namespace minifi { namespace core { -ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid, - ProcessGroup *parent) +ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid, ProcessGroup *parent) : logger_(logging::LoggerFactory<ProcessGroup>::getLogger()), name_(name), type_(type), @@ -60,8 +59,7 @@ ProcessGroup::~ProcessGroup() { connection->drain(); } - for (std::set<ProcessGroup *>::iterator it = child_process_groups_.begin(); - it != child_process_groups_.end(); ++it) { + for (std::set<ProcessGroup *>::iterator it = child_process_groups_.begin(); it != child_process_groups_.end(); ++it) { ProcessGroup *processGroup(*it); delete processGroup; } @@ -78,8 +76,7 @@ void ProcessGroup::addProcessor(std::shared_ptr<Processor> processor) { if (processors_.find(processor) == processors_.end()) { // We do not have the same processor in this process group yet processors_.insert(processor); - logger_->log_info("Add processor %s into process group %s", - processor->getName().c_str(), name_.c_str()); + logger_->log_info("Add processor %s into process group %s", processor->getName().c_str(), name_.c_str()); } } @@ -89,8 +86,7 @@ void ProcessGroup::removeProcessor(std::shared_ptr<Processor> processor) { if (processors_.find(processor) != processors_.end()) { // We do have the same processor in this process group yet processors_.erase(processor); - logger_->log_info("Remove processor %s from process group %s", - processor->getName().c_str(), name_.c_str()); + logger_->log_info("Remove processor %s from process group %s", processor->getName().c_str(), name_.c_str()); } } @@ -100,8 +96,7 @@ void ProcessGroup::addProcessGroup(ProcessGroup *child) { if (child_process_groups_.find(child) == child_process_groups_.end()) { // We do not have the same child process group in this process group yet child_process_groups_.insert(child); - logger_->log_info("Add child process group %s into process group %s", - child->getName().c_str(), name_.c_str()); + logger_->log_info("Add child process group %s into process group %s", child->getName().c_str(), name_.c_str()); } } @@ -111,13 +106,11 @@ void ProcessGroup::removeProcessGroup(ProcessGroup *child) { if (child_process_groups_.find(child) != child_process_groups_.end()) { // We do have the same child process group in this process group yet child_process_groups_.erase(child); - logger_->log_info("Remove child process group %s from process group %s", - child->getName().c_str(), name_.c_str()); + logger_->log_info("Remove child process group %s from process group %s", child->getName().c_str(), name_.c_str()); } } -void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler, - EventDrivenSchedulingAgent *eventScheduler) { +void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler) { std::lock_guard<std::recursive_mutex> lock(mutex_); try { @@ -125,8 +118,7 @@ void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler, for (auto processor : processors_) { logger_->log_debug("Starting %s", processor->getName().c_str()); - if (!processor->isRunning() - && processor->getScheduledState() != DISABLED) { + if (!processor->isRunning() && processor->getScheduledState() != DISABLED) { if (processor->getSchedulingStrategy() == TIMER_DRIVEN) timeScheduler->schedule(processor); else if (processor->getSchedulingStrategy() == EVENT_DRIVEN) @@ -141,20 +133,17 @@ void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler, logger_->log_debug("Caught Exception %s", exception.what()); throw; } catch (...) { - logger_->log_debug( - "Caught Exception during process group start processing"); + logger_->log_debug("Caught Exception during process group start processing"); throw; } } -void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, - EventDrivenSchedulingAgent *eventScheduler) { +void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler) { std::lock_guard<std::recursive_mutex> lock(mutex_); try { // Stop all the processor node, input and output ports - for (std::set<std::shared_ptr<Processor> >::iterator it = - processors_.begin(); it != processors_.end(); ++it) { + for (std::set<std::shared_ptr<Processor> >::iterator it = processors_.begin(); it != processors_.end(); ++it) { std::shared_ptr<Processor> processor(*it); if (processor->getSchedulingStrategy() == TIMER_DRIVEN) timeScheduler->unschedule(processor); @@ -162,8 +151,7 @@ void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, eventScheduler->unschedule(processor); } - for (std::set<ProcessGroup *>::iterator it = child_process_groups_.begin(); - it != child_process_groups_.end(); ++it) { + for (std::set<ProcessGroup *>::iterator it = child_process_groups_.begin(); it != child_process_groups_.end(); ++it) { ProcessGroup *processGroup(*it); processGroup->stopProcessing(timeScheduler, eventScheduler); } @@ -195,8 +183,7 @@ std::shared_ptr<Processor> ProcessGroup::findProcessor(uuid_t uuid) { } } for (auto processGroup : child_process_groups_) { - logger_->log_info("find processor child %s", - processGroup->getName().c_str()); + logger_->log_info("find processor child %s", processGroup->getName().c_str()); std::shared_ptr<Processor> processor = processGroup->findProcessor(uuid); if (processor) return processor; @@ -204,9 +191,7 @@ std::shared_ptr<Processor> ProcessGroup::findProcessor(uuid_t uuid) { return ret; } -void ProcessGroup::addControllerService( - const std::string &nodeId, - std::shared_ptr<core::controller::ControllerServiceNode> &node) { +void ProcessGroup::addControllerService(const std::string &nodeId, std::shared_ptr<core::controller::ControllerServiceNode> &node) { controller_service_map_.put(nodeId, node); } @@ -215,13 +200,11 @@ void ProcessGroup::addControllerService( * @param node node identifier * @return controller service node, if it exists. */ -std::shared_ptr<core::controller::ControllerServiceNode> ProcessGroup::findControllerService( - const std::string &nodeId) { +std::shared_ptr<core::controller::ControllerServiceNode> ProcessGroup::findControllerService(const std::string &nodeId) { return controller_service_map_.getControllerServiceNode(nodeId); } -std::shared_ptr<Processor> ProcessGroup::findProcessor( - const std::string &processorName) { +std::shared_ptr<Processor> ProcessGroup::findProcessor(const std::string &processorName) { std::lock_guard<std::recursive_mutex> lock(mutex_); std::shared_ptr<Processor> ret = NULL; for (auto processor : processors_) { @@ -230,17 +213,14 @@ std::shared_ptr<Processor> ProcessGroup::findProcessor( return processor; } for (auto processGroup : child_process_groups_) { - std::shared_ptr<Processor> processor = processGroup->findProcessor( - processorName); + std::shared_ptr<Processor> processor = processGroup->findProcessor(processorName); if (processor) return processor; } return ret; } -void ProcessGroup::updatePropertyValue(std::string processorName, - std::string propertyName, - std::string propertyValue) { +void ProcessGroup::updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) { std::lock_guard<std::recursive_mutex> lock(mutex_); for (auto processor : processors_) { if (processor->getName() == processorName) { @@ -248,14 +228,12 @@ void ProcessGroup::updatePropertyValue(std::string processorName, } } for (auto processGroup : child_process_groups_) { - processGroup->updatePropertyValue(processorName, propertyName, - propertyValue); + processGroup->updatePropertyValue(processorName, propertyName, propertyValue); } return; } -void ProcessGroup::getConnections( - std::map<std::string, std::shared_ptr<Connection>> &connectionMap) { +void ProcessGroup::getConnections(std::map<std::string, std::shared_ptr<Connection>> &connectionMap) { for (auto connection : connections_) { connectionMap[connection->getUUIDStr()] = connection; } @@ -270,8 +248,7 @@ void ProcessGroup::addConnection(std::shared_ptr<Connection> connection) { if (connections_.find(connection) == connections_.end()) { // We do not have the same connection in this process group yet connections_.insert(connection); - logger_->log_info("Add connection %s into process group %s", - connection->getName().c_str(), name_.c_str()); + logger_->log_info("Add connection %s into process group %s", connection->getName().c_str(), name_.c_str()); uuid_t sourceUUID; std::shared_ptr<Processor> source = NULL; connection->getSourceUUID(sourceUUID); @@ -293,8 +270,7 @@ void ProcessGroup::removeConnection(std::shared_ptr<Connection> connection) { if (connections_.find(connection) != connections_.end()) { // We do not have the same connection in this process group yet connections_.erase(connection); - logger_->log_info("Remove connection %s into process group %s", - connection->getName().c_str(), name_.c_str()); + logger_->log_info("Remove connection %s into process group %s", connection->getName().c_str(), name_.c_str()); uuid_t sourceUUID; std::shared_ptr<Processor> source = NULL; connection->getSourceUUID(sourceUUID);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/ProcessSession.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index 70de3f6..037660f 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -38,40 +38,31 @@ namespace core { std::shared_ptr<core::FlowFile> ProcessSession::create() { std::map<std::string, std::string> empty; - std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>( - process_context_->getProvenanceRepository(), empty); + std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(), empty); _addedFlowFiles[record->getUUIDStr()] = record; - logger_->log_debug("Create FlowFile with UUID %s", - record->getUUIDStr().c_str()); - std::string details = process_context_->getProcessorNode().getName() - + " creates flow record " + record->getUUIDStr(); + logger_->log_debug("Create FlowFile with UUID %s", record->getUUIDStr().c_str()); + std::string details = process_context_->getProcessorNode().getName() + " creates flow record " + record->getUUIDStr(); provenance_report_->create(record, details); return record; } -std::shared_ptr<core::FlowFile> ProcessSession::create( - std::shared_ptr<core::FlowFile> &&parent) { +std::shared_ptr<core::FlowFile> ProcessSession::create(std::shared_ptr<core::FlowFile> &&parent) { std::map<std::string, std::string> empty; - std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>( - process_context_->getProvenanceRepository(), empty); + std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(), empty); if (record) { _addedFlowFiles[record->getUUIDStr()] = record; - logger_->log_debug("Create FlowFile with UUID %s", - record->getUUIDStr().c_str()); + logger_->log_debug("Create FlowFile with UUID %s", record->getUUIDStr().c_str()); } if (record) { // Copy attributes - std::map<std::string, std::string> parentAttributes = - parent->getAttributes(); + std::map<std::string, std::string> parentAttributes = parent->getAttributes(); std::map<std::string, std::string>::iterator it; for (it = parentAttributes.begin(); it != parentAttributes.end(); it++) { - if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) - || it->first == FlowAttributeKey(DISCARD_REASON) - || it->first == FlowAttributeKey(UUID)) + if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) || it->first == FlowAttributeKey(DISCARD_REASON) || it->first == FlowAttributeKey(UUID)) // Do not copy special attributes from parent continue; record->setAttribute(it->first, it->second); @@ -83,8 +74,7 @@ std::shared_ptr<core::FlowFile> ProcessSession::create( return record; } -std::shared_ptr<core::FlowFile> ProcessSession::clone( - std::shared_ptr<core::FlowFile> &parent) { +std::shared_ptr<core::FlowFile> ProcessSession::clone(std::shared_ptr<core::FlowFile> &parent) { std::shared_ptr<core::FlowFile> record = this->create(parent); if (record) { // Copy Resource Claim @@ -100,24 +90,18 @@ std::shared_ptr<core::FlowFile> ProcessSession::clone( return record; } -std::shared_ptr<core::FlowFile> ProcessSession::cloneDuringTransfer( - std::shared_ptr<core::FlowFile> &parent) { +std::shared_ptr<core::FlowFile> ProcessSession::cloneDuringTransfer(std::shared_ptr<core::FlowFile> &parent) { std::map<std::string, std::string> empty; - std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>( - process_context_->getProvenanceRepository(), empty); + std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(), empty); if (record) { this->_clonedFlowFiles[record->getUUIDStr()] = record; - logger_->log_debug("Clone FlowFile with UUID %s during transfer", - record->getUUIDStr().c_str()); + logger_->log_debug("Clone FlowFile with UUID %s during transfer", record->getUUIDStr().c_str()); // Copy attributes - std::map<std::string, std::string> parentAttributes = - parent->getAttributes(); + std::map<std::string, std::string> parentAttributes = parent->getAttributes(); std::map<std::string, std::string>::iterator it; for (it = parentAttributes.begin(); it != parentAttributes.end(); it++) { - if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) - || it->first == FlowAttributeKey(DISCARD_REASON) - || it->first == FlowAttributeKey(UUID)) + if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) || it->first == FlowAttributeKey(DISCARD_REASON) || it->first == FlowAttributeKey(UUID)) // Do not copy special attributes from parent continue; record->setAttribute(it->first, it->second); @@ -141,18 +125,15 @@ std::shared_ptr<core::FlowFile> ProcessSession::cloneDuringTransfer( return record; } -std::shared_ptr<core::FlowFile> ProcessSession::clone( - std::shared_ptr<core::FlowFile> &parent, int64_t offset, int64_t size) { +std::shared_ptr<core::FlowFile> ProcessSession::clone(std::shared_ptr<core::FlowFile> &parent, int64_t offset, int64_t size) { std::shared_ptr<core::FlowFile> record = this->create(parent); if (record) { if (parent->getResourceClaim()) { if ((offset + size) > parent->getSize()) { // Set offset and size - logger_->log_error("clone offset %d and size %d exceed parent size %d", - offset, size, parent->getSize()); + logger_->log_error("clone offset %d and size %d exceed parent size %d", offset, size, parent->getSize()); // Remove the Add FlowFile for the session - std::map<std::string, std::shared_ptr<core::FlowFile> >::iterator it = - this->_addedFlowFiles.find(record->getUUIDStr()); + std::map<std::string, std::shared_ptr<core::FlowFile> >::iterator it = this->_addedFlowFiles.find(record->getUUIDStr()); if (it != this->_addedFlowFiles.end()) this->_addedFlowFiles.erase(record->getUUIDStr()); return nullptr; @@ -174,85 +155,65 @@ std::shared_ptr<core::FlowFile> ProcessSession::clone( void ProcessSession::remove(std::shared_ptr<core::FlowFile> &flow) { flow->setDeleted(true); _deletedFlowFiles[flow->getUUIDStr()] = flow; - std::string reason = process_context_->getProcessorNode().getName() - + " drop flow record " + flow->getUUIDStr(); + std::string reason = process_context_->getProcessorNode().getName() + " drop flow record " + flow->getUUIDStr(); provenance_report_->drop(flow, reason); } void ProcessSession::remove(std::shared_ptr<core::FlowFile> &&flow) { flow->setDeleted(true); _deletedFlowFiles[flow->getUUIDStr()] = flow; - std::string reason = process_context_->getProcessorNode().getName() - + " drop flow record " + flow->getUUIDStr(); + std::string reason = process_context_->getProcessorNode().getName() + " drop flow record " + flow->getUUIDStr(); provenance_report_->drop(flow, reason); } -void ProcessSession::putAttribute(std::shared_ptr<core::FlowFile> &flow, - std::string key, std::string value) { +void ProcessSession::putAttribute(std::shared_ptr<core::FlowFile> &flow, std::string key, std::string value) { flow->setAttribute(key, value); - std::string details = process_context_->getProcessorNode().getName() - + " modify flow record " + flow->getUUIDStr() + " attribute " + key + ":" - + value; + std::string details = process_context_->getProcessorNode().getName() + " modify flow record " + flow->getUUIDStr() + " attribute " + key + ":" + value; provenance_report_->modifyAttributes(flow, details); } -void ProcessSession::removeAttribute(std::shared_ptr<core::FlowFile> &flow, - std::string key) { +void ProcessSession::removeAttribute(std::shared_ptr<core::FlowFile> &flow, std::string key) { flow->removeAttribute(key); - std::string details = process_context_->getProcessorNode().getName() - + " remove flow record " + flow->getUUIDStr() + " attribute " + key; + std::string details = process_context_->getProcessorNode().getName() + " remove flow record " + flow->getUUIDStr() + " attribute " + key; provenance_report_->modifyAttributes(flow, details); } -void ProcessSession::putAttribute(std::shared_ptr<core::FlowFile> &&flow, - std::string key, std::string value) { +void ProcessSession::putAttribute(std::shared_ptr<core::FlowFile> &&flow, std::string key, std::string value) { flow->setAttribute(key, value); - std::string details = process_context_->getProcessorNode().getName() - + " modify flow record " + flow->getUUIDStr() + " attribute " + key + ":" - + value; + std::string details = process_context_->getProcessorNode().getName() + " modify flow record " + flow->getUUIDStr() + " attribute " + key + ":" + value; provenance_report_->modifyAttributes(flow, details); } -void ProcessSession::removeAttribute(std::shared_ptr<core::FlowFile> &&flow, - std::string key) { +void ProcessSession::removeAttribute(std::shared_ptr<core::FlowFile> &&flow, std::string key) { flow->removeAttribute(key); - std::string details = process_context_->getProcessorNode().getName() - + " remove flow record " + flow->getUUIDStr() + " attribute " + key; + std::string details = process_context_->getProcessorNode().getName() + " remove flow record " + flow->getUUIDStr() + " attribute " + key; provenance_report_->modifyAttributes(flow, details); } void ProcessSession::penalize(std::shared_ptr<core::FlowFile> &flow) { - flow->setPenaltyExpiration( - getTimeMillis() - + process_context_->getProcessorNode().getPenalizationPeriodMsec()); + flow->setPenaltyExpiration(getTimeMillis() + process_context_->getProcessorNode().getPenalizationPeriodMsec()); } void ProcessSession::penalize(std::shared_ptr<core::FlowFile> &&flow) { - flow->setPenaltyExpiration( - getTimeMillis() - + process_context_->getProcessorNode().getPenalizationPeriodMsec()); + flow->setPenaltyExpiration(getTimeMillis() + process_context_->getProcessorNode().getPenalizationPeriodMsec()); } -void ProcessSession::transfer(std::shared_ptr<core::FlowFile> &flow, - Relationship relationship) { +void ProcessSession::transfer(std::shared_ptr<core::FlowFile> &flow, Relationship relationship) { _transferRelationship[flow->getUUIDStr()] = relationship; } -void ProcessSession::transfer(std::shared_ptr<core::FlowFile> &&flow, - Relationship relationship) { +void ProcessSession::transfer(std::shared_ptr<core::FlowFile> &&flow, Relationship relationship) { _transferRelationship[flow->getUUIDStr()] = relationship; } -void ProcessSession::write(std::shared_ptr<core::FlowFile> &flow, - OutputStreamCallback *callback) { +void ProcessSession::write(std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback) { std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>( DEFAULT_CONTENT_DIRECTORY); try { std::ofstream fs; uint64_t startTime = getTimeMillis(); - fs.open(claim->getContentFullPath().c_str(), - std::fstream::out | std::fstream::binary | std::fstream::trunc); + fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc); if (fs.is_open()) { // Call the callback to write the content callback->process(&fs); @@ -271,8 +232,7 @@ void ProcessSession::write(std::shared_ptr<core::FlowFile> &flow, logger_->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s", flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ fs.close(); - std::string details = process_context_->getProcessorNode().getName() - + " modify flow record content " + flow->getUUIDStr(); + std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr(); uint64_t endTime = getTimeMillis(); provenance_report_->modifyContent(flow, details, endTime - startTime); } else { @@ -299,14 +259,12 @@ void ProcessSession::write(std::shared_ptr<core::FlowFile> &flow, } } -void ProcessSession::write(std::shared_ptr<core::FlowFile> &&flow, - OutputStreamCallback *callback) { +void ProcessSession::write(std::shared_ptr<core::FlowFile> &&flow, OutputStreamCallback *callback) { std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(); try { std::ofstream fs; uint64_t startTime = getTimeMillis(); - fs.open(claim->getContentFullPath().c_str(), - std::fstream::out | std::fstream::binary | std::fstream::trunc); + fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc); if (fs.is_open()) { // Call the callback to write the content callback->process(&fs); @@ -325,8 +283,7 @@ void ProcessSession::write(std::shared_ptr<core::FlowFile> &&flow, logger_->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s", flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ fs.close(); - std::string details = process_context_->getProcessorNode().getName() - + " modify flow record content " + flow->getUUIDStr(); + std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr(); uint64_t endTime = getTimeMillis(); provenance_report_->modifyContent(flow, details, endTime - startTime); } else { @@ -353,8 +310,7 @@ void ProcessSession::write(std::shared_ptr<core::FlowFile> &&flow, } } -void ProcessSession::append(std::shared_ptr<core::FlowFile> &&flow, - OutputStreamCallback *callback) { +void ProcessSession::append(std::shared_ptr<core::FlowFile> &&flow, OutputStreamCallback *callback) { std::shared_ptr<ResourceClaim> claim = nullptr; if (flow->getResourceClaim() == nullptr) { @@ -367,8 +323,7 @@ void ProcessSession::append(std::shared_ptr<core::FlowFile> &&flow, try { std::ofstream fs; uint64_t startTime = getTimeMillis(); - fs.open(claim->getContentFullPath().c_str(), - std::fstream::out | std::fstream::binary | std::fstream::app); + fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::app); if (fs.is_open()) { // Call the callback to write the content std::streampos oldPos = fs.tellp(); @@ -380,8 +335,7 @@ void ProcessSession::append(std::shared_ptr<core::FlowFile> &&flow, logger_->log_debug("Append offset %d extra length %d to new size %d into content %s for FlowFile UUID %s", flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ fs.close(); - std::string details = process_context_->getProcessorNode().getName() - + " modify flow record content " + flow->getUUIDStr(); + std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr(); uint64_t endTime = getTimeMillis(); provenance_report_->modifyContent(flow, details, endTime - startTime); } else { @@ -400,8 +354,7 @@ void ProcessSession::append(std::shared_ptr<core::FlowFile> &&flow, } } -void ProcessSession::append(std::shared_ptr<core::FlowFile> &flow, - OutputStreamCallback *callback) { +void ProcessSession::append(std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback) { std::shared_ptr<ResourceClaim> claim = nullptr; if (flow->getResourceClaim() == nullptr) { @@ -414,8 +367,7 @@ void ProcessSession::append(std::shared_ptr<core::FlowFile> &flow, try { std::ofstream fs; uint64_t startTime = getTimeMillis(); - fs.open(claim->getContentFullPath().c_str(), - std::fstream::out | std::fstream::binary | std::fstream::app); + fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::app); if (fs.is_open()) { // Call the callback to write the content std::streampos oldPos = fs.tellp(); @@ -427,8 +379,7 @@ void ProcessSession::append(std::shared_ptr<core::FlowFile> &flow, logger_->log_debug("Append offset %d extra length %d to new size %d into content %s for FlowFile UUID %s", flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ fs.close(); - std::string details = process_context_->getProcessorNode().getName() - + " modify flow record content " + flow->getUUIDStr(); + std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr(); uint64_t endTime = getTimeMillis(); provenance_report_->modifyContent(flow, details, endTime - startTime); } else { @@ -447,21 +398,18 @@ void ProcessSession::append(std::shared_ptr<core::FlowFile> &flow, } } -void ProcessSession::read(std::shared_ptr<core::FlowFile> &flow, - InputStreamCallback *callback) { +void ProcessSession::read(std::shared_ptr<core::FlowFile> &flow, InputStreamCallback *callback) { try { std::shared_ptr<ResourceClaim> claim = nullptr; if (flow->getResourceClaim() == nullptr) { // No existed claim for read, we throw exception - throw Exception(FILE_OPERATION_EXCEPTION, - "No Content Claim existed for read"); + throw Exception(FILE_OPERATION_EXCEPTION, "No Content Claim existed for read"); } claim = flow->getResourceClaim(); std::ifstream fs; - fs.open(claim->getContentFullPath().c_str(), - std::fstream::in | std::fstream::binary); + fs.open(claim->getContentFullPath().c_str(), std::fstream::in | std::fstream::binary); if (fs.is_open()) { fs.seekg(flow->getOffset(), fs.beg); @@ -487,21 +435,18 @@ void ProcessSession::read(std::shared_ptr<core::FlowFile> &flow, } } -void ProcessSession::read(std::shared_ptr<core::FlowFile> &&flow, - InputStreamCallback *callback) { +void ProcessSession::read(std::shared_ptr<core::FlowFile> &&flow, InputStreamCallback *callback) { try { std::shared_ptr<ResourceClaim> claim = nullptr; if (flow->getResourceClaim() == nullptr) { // No existed claim for read, we throw exception - throw Exception(FILE_OPERATION_EXCEPTION, - "No Content Claim existed for read"); + throw Exception(FILE_OPERATION_EXCEPTION, "No Content Claim existed for read"); } claim = flow->getResourceClaim(); std::ifstream fs; - fs.open(claim->getContentFullPath().c_str(), - std::fstream::in | std::fstream::binary); + fs.open(claim->getContentFullPath().c_str(), std::fstream::in | std::fstream::binary); if (fs.is_open()) { fs.seekg(flow->getOffset(), fs.beg); @@ -533,8 +478,7 @@ void ProcessSession::read(std::shared_ptr<core::FlowFile> &&flow, * @param flow flow file * */ -void ProcessSession::importFrom(io::DataStream &stream, - std::shared_ptr<core::FlowFile> &&flow) { +void ProcessSession::importFrom(io::DataStream &stream, std::shared_ptr<core::FlowFile> &&flow) { std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(); int max_read = getpagesize(); @@ -544,8 +488,7 @@ void ProcessSession::importFrom(io::DataStream &stream, try { std::ofstream fs; uint64_t startTime = getTimeMillis(); - fs.open(claim->getContentFullPath().c_str(), - std::fstream::out | std::fstream::binary | std::fstream::trunc); + fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc); if (fs.is_open()) { size_t position = 0; @@ -576,15 +519,11 @@ void ProcessSession::importFrom(io::DataStream &stream, flow->setResourceClaim(claim); claim->increaseFlowFileRecordOwnedCount(); - logger_->log_debug( - "Import offset %d length %d into content %s for FlowFile UUID %s", - flow->getOffset(), flow->getSize(), - flow->getResourceClaim()->getContentFullPath().c_str(), - flow->getUUIDStr().c_str()); + logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath().c_str(), + flow->getUUIDStr().c_str()); fs.close(); - std::string details = process_context_->getProcessorNode().getName() - + " modify flow record content " + flow->getUUIDStr(); + std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr(); uint64_t endTime = getTimeMillis(); provenance_report_->modifyContent(flow, details, endTime - startTime); } else { @@ -611,9 +550,9 @@ void ProcessSession::importFrom(io::DataStream &stream, } } -void ProcessSession::import(std::string source, - std::shared_ptr<core::FlowFile> &flow, - bool keepSource, uint64_t offset) { +void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile> &flow, +bool keepSource, + uint64_t offset) { std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(); char *buf = NULL; int size = 4096; @@ -622,8 +561,7 @@ void ProcessSession::import(std::string source, try { std::ofstream fs; uint64_t startTime = getTimeMillis(); - fs.open(claim->getContentFullPath().c_str(), - std::fstream::out | std::fstream::binary | std::fstream::trunc); + fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc); std::ifstream input; input.open(source.c_str(), std::fstream::in | std::fstream::binary); @@ -649,18 +587,14 @@ void ProcessSession::import(std::string source, flow->setResourceClaim(claim); claim->increaseFlowFileRecordOwnedCount(); - logger_->log_debug( - "Import offset %d length %d into content %s for FlowFile UUID %s", - flow->getOffset(), flow->getSize(), - flow->getResourceClaim()->getContentFullPath().c_str(), - flow->getUUIDStr().c_str()); + logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath().c_str(), + flow->getUUIDStr().c_str()); fs.close(); input.close(); if (!keepSource) std::remove(source.c_str()); - std::string details = process_context_->getProcessorNode().getName() - + " modify flow record content " + flow->getUUIDStr(); + std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr(); uint64_t endTime = getTimeMillis(); provenance_report_->modifyContent(flow, details, endTime - startTime); } else { @@ -692,9 +626,9 @@ void ProcessSession::import(std::string source, } } -void ProcessSession::import(std::string source, - std::shared_ptr<core::FlowFile> &&flow, - bool keepSource, uint64_t offset) { +void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile> &&flow, +bool keepSource, + uint64_t offset) { std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(); char *buf = NULL; @@ -704,8 +638,7 @@ void ProcessSession::import(std::string source, try { std::ofstream fs; uint64_t startTime = getTimeMillis(); - fs.open(claim->getContentFullPath().c_str(), - std::fstream::out | std::fstream::binary | std::fstream::trunc); + fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc); std::ifstream input; input.open(source.c_str(), std::fstream::in | std::fstream::binary); @@ -731,18 +664,14 @@ void ProcessSession::import(std::string source, flow->setResourceClaim(claim); claim->increaseFlowFileRecordOwnedCount(); - logger_->log_debug( - "Import offset %d length %d into content %s for FlowFile UUID %s", - flow->getOffset(), flow->getSize(), - flow->getResourceClaim()->getContentFullPath().c_str(), - flow->getUUIDStr().c_str()); + logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath().c_str(), + flow->getUUIDStr().c_str()); fs.close(); input.close(); if (!keepSource) std::remove(source.c_str()); - std::string details = process_context_->getProcessorNode().getName() - + " modify flow record content " + flow->getUUIDStr(); + std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr(); uint64_t endTime = getTimeMillis(); provenance_report_->modifyContent(flow, details, endTime - startTime); } else { @@ -781,21 +710,16 @@ void ProcessSession::commit() { std::shared_ptr<core::FlowFile> record = it.second; if (record->isDeleted()) continue; - std::map<std::string, Relationship>::iterator itRelationship = this - ->_transferRelationship.find(record->getUUIDStr()); + std::map<std::string, Relationship>::iterator itRelationship = this->_transferRelationship.find(record->getUUIDStr()); if (itRelationship != _transferRelationship.end()) { Relationship relationship = itRelationship->second; // Find the relationship, we need to find the connections for that relationship - std::set<std::shared_ptr<Connectable>> connections = process_context_ - ->getProcessorNode().getOutGoingConnections(relationship.getName()); + std::set<std::shared_ptr<Connectable>> connections = process_context_->getProcessorNode().getOutGoingConnections(relationship.getName()); if (connections.empty()) { // No connection - if (!process_context_->getProcessorNode().isAutoTerminated( - relationship)) { + if (!process_context_->getProcessorNode().isAutoTerminated(relationship)) { // Not autoterminate, we should have the connect - std::string message = - "Connect empty for non auto terminated relationship" - + relationship.getName(); + std::string message = "Connect empty for non auto terminated relationship" + relationship.getName(); throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str()); } else { // Autoterminated @@ -803,9 +727,7 @@ void ProcessSession::commit() { } } else { // We connections, clone the flow and assign the connection accordingly - for (std::set<std::shared_ptr<Connectable>>::iterator itConnection = - connections.begin(); itConnection != connections.end(); - ++itConnection) { + for (std::set<std::shared_ptr<Connectable>>::iterator itConnection = connections.begin(); itConnection != connections.end(); ++itConnection) { std::shared_ptr<Connectable> connection = *itConnection; if (itConnection == connections.begin()) { // First connection which the flow need be routed to @@ -817,15 +739,13 @@ void ProcessSession::commit() { if (cloneRecord) cloneRecord->setConnection(connection); else - throw Exception(PROCESS_SESSION_EXCEPTION, - "Can not clone the flow for transfer"); + throw Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer"); } } } } else { // Can not find relationship for the flow - throw Exception(PROCESS_SESSION_EXCEPTION, - "Can not find the transfer relationship for the flow"); + throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow"); } } @@ -834,21 +754,16 @@ void ProcessSession::commit() { std::shared_ptr<core::FlowFile> record = it.second; if (record->isDeleted()) continue; - std::map<std::string, Relationship>::iterator itRelationship = this - ->_transferRelationship.find(record->getUUIDStr()); + std::map<std::string, Relationship>::iterator itRelationship = this->_transferRelationship.find(record->getUUIDStr()); if (itRelationship != _transferRelationship.end()) { Relationship relationship = itRelationship->second; // Find the relationship, we need to find the connections for that relationship - std::set<std::shared_ptr<Connectable>> connections = process_context_ - ->getProcessorNode().getOutGoingConnections(relationship.getName()); + std::set<std::shared_ptr<Connectable>> connections = process_context_->getProcessorNode().getOutGoingConnections(relationship.getName()); if (connections.empty()) { // No connection - if (!process_context_->getProcessorNode().isAutoTerminated( - relationship)) { + if (!process_context_->getProcessorNode().isAutoTerminated(relationship)) { // Not autoterminate, we should have the connect - std::string message = - "Connect empty for non auto terminated relationship " - + relationship.getName(); + std::string message = "Connect empty for non auto terminated relationship " + relationship.getName(); throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str()); } else { // Autoterminated @@ -856,9 +771,7 @@ void ProcessSession::commit() { } } else { // We connections, clone the flow and assign the connection accordingly - for (std::set<std::shared_ptr<Connectable>>::iterator itConnection = - connections.begin(); itConnection != connections.end(); - ++itConnection) { + for (std::set<std::shared_ptr<Connectable>>::iterator itConnection = connections.begin(); itConnection != connections.end(); ++itConnection) { std::shared_ptr<Connectable> connection(*itConnection); if (itConnection == connections.begin()) { // First connection which the flow need be routed to @@ -870,15 +783,13 @@ void ProcessSession::commit() { if (cloneRecord) cloneRecord->setConnection(connection); else - throw Exception(PROCESS_SESSION_EXCEPTION, - "Can not clone the flow for transfer"); + throw Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer"); } } } } else { // Can not find relationship for the flow - throw Exception(PROCESS_SESSION_EXCEPTION, - "Can not find the transfer relationship for the flow"); + throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow"); } } @@ -890,8 +801,7 @@ void ProcessSession::commit() { continue; } - connection = std::static_pointer_cast<Connection>( - record->getConnection()); + connection = std::static_pointer_cast<Connection>(record->getConnection()); if ((connection) != nullptr) connection->put(record); } @@ -900,8 +810,7 @@ void ProcessSession::commit() { if (record->isDeleted()) { continue; } - connection = std::static_pointer_cast<Connection>( - record->getConnection()); + connection = std::static_pointer_cast<Connection>(record->getConnection()); if ((connection) != nullptr) connection->put(record); } @@ -911,8 +820,7 @@ void ProcessSession::commit() { if (record->isDeleted()) { continue; } - connection = std::static_pointer_cast<Connection>( - record->getConnection()); + connection = std::static_pointer_cast<Connection>(record->getConnection()); if ((connection) != nullptr) connection->put(record); } @@ -925,8 +833,7 @@ void ProcessSession::commit() { _originalFlowFiles.clear(); // persistent the provenance report this->provenance_report_->commit(); - logger_->log_trace("ProcessSession committed for %s", - process_context_->getProcessorNode().getName().c_str()); + logger_->log_trace("ProcessSession committed for %s", process_context_->getProcessorNode().getName().c_str()); } catch (std::exception &exception) { logger_->log_debug("Caught Exception %s", exception.what()); throw; @@ -942,11 +849,9 @@ void ProcessSession::rollback() { // Requeue the snapshot of the flowfile back for (const auto &it : _originalFlowFiles) { std::shared_ptr<core::FlowFile> record = it.second; - connection = std::static_pointer_cast<Connection>( - record->getOriginalConnection()); + connection = std::static_pointer_cast<Connection>(record->getOriginalConnection()); if ((connection) != nullptr) { - std::shared_ptr<FlowFileRecord> flowf = std::static_pointer_cast< - FlowFileRecord>(record); + std::shared_ptr<FlowFileRecord> flowf = std::static_pointer_cast<FlowFileRecord>(record); flowf->setSnapShot(false); connection->put(record); } @@ -957,8 +862,7 @@ void ProcessSession::rollback() { _addedFlowFiles.clear(); _updatedFlowFiles.clear(); _deletedFlowFiles.clear(); - logger_->log_trace("ProcessSession rollback for %s", - process_context_->getProcessorNode().getName().c_str()); + logger_->log_trace("ProcessSession rollback for %s", process_context_->getProcessorNode().getName().c_str()); } catch (std::exception &exception) { logger_->log_debug("Caught Exception %s", exception.what()); throw; @@ -969,25 +873,21 @@ void ProcessSession::rollback() { } std::shared_ptr<core::FlowFile> ProcessSession::get() { - std::shared_ptr<Connectable> first = process_context_->getProcessorNode() - .getNextIncomingConnection(); + std::shared_ptr<Connectable> first = process_context_->getProcessorNode().getNextIncomingConnection(); if (first == NULL) return NULL; - std::shared_ptr<Connection> current = std::static_pointer_cast<Connection>( - first); + std::shared_ptr<Connection> current = std::static_pointer_cast<Connection>(first); do { std::set<std::shared_ptr<core::FlowFile> > expired; std::shared_ptr<core::FlowFile> ret = current->poll(expired); if (expired.size() > 0) { // Remove expired flow record - for (std::set<std::shared_ptr<core::FlowFile> >::iterator it = expired - .begin(); it != expired.end(); ++it) { + for (std::set<std::shared_ptr<core::FlowFile> >::iterator it = expired.begin(); it != expired.end(); ++it) { std::shared_ptr<core::FlowFile> record = *it; - std::string details = process_context_->getProcessorNode().getName() - + " expire flow record " + record->getUUIDStr(); + std::string details = process_context_->getProcessorNode().getName() + " expire flow record " + record->getUUIDStr(); provenance_report_->expire(record, details); } } @@ -996,19 +896,15 @@ std::shared_ptr<core::FlowFile> ProcessSession::get() { ret->setDeleted(false); _updatedFlowFiles[ret->getUUIDStr()] = ret; std::map<std::string, std::string> empty; - std::shared_ptr<core::FlowFile> snapshot = - std::make_shared<FlowFileRecord>( - process_context_->getProvenanceRepository(), empty); - logger_->log_debug("Create Snapshot FlowFile with UUID %s", - snapshot->getUUIDStr().c_str()); + std::shared_ptr<core::FlowFile> snapshot = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(), empty); + logger_->log_debug("Create Snapshot FlowFile with UUID %s", snapshot->getUUIDStr().c_str()); snapshot = ret; // snapshot->duplicate(ret); // save a snapshot _originalFlowFiles[snapshot->getUUIDStr()] = snapshot; return ret; } - current = std::static_pointer_cast<Connection>( - process_context_->getProcessorNode().getNextIncomingConnection()); + current = std::static_pointer_cast<Connection>(process_context_->getProcessorNode().getNextIncomingConnection()); } while (current != NULL && current != first); return NULL; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/Processor.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp index dbeb46f..7b07638 100644 --- a/libminifi/src/core/Processor.cpp +++ b/libminifi/src/core/Processor.cpp @@ -45,8 +45,9 @@ namespace minifi { namespace core { Processor::Processor(std::string name, uuid_t uuid) - : Connectable(name, uuid), ConfigurableComponent(), - logger_(logging::LoggerFactory<Processor>::getLogger()) { + : Connectable(name, uuid), + ConfigurableComponent(), + logger_(logging::LoggerFactory<Processor>::getLogger()) { has_work_.store(false); // Setup the default values state_ = DISABLED; @@ -61,8 +62,7 @@ Processor::Processor(std::string name, uuid_t uuid) active_tasks_ = 0; yield_expiration_ = 0; incoming_connections_Iter = this->_incomingConnections.begin(); - logger_->log_info("Processor %s created UUID %s", name_.c_str(), - uuidStr_.c_str()); + logger_->log_info("Processor %s created UUID %s", name_.c_str(), uuidStr_.c_str()); } bool Processor::isRunning() { @@ -77,12 +77,10 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) { bool ret = false; if (isRunning()) { - logger_->log_info("Can not add connection while the process %s is running", - name_.c_str()); + logger_->log_info("Can not add connection while the process %s is running", name_.c_str()); return false; } - std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>( - conn); + std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn); std::lock_guard<std::mutex> lock(mutex_); uuid_t srcUUID; @@ -101,9 +99,7 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) { if (_incomingConnections.find(connection) == _incomingConnections.end()) { _incomingConnections.insert(connection); connection->setDestination(shared_from_this()); - logger_->log_info( - "Add connection %s into Processor %s incoming connection", - connection->getName().c_str(), name_.c_str()); + logger_->log_info("Add connection %s into Processor %s incoming connection", connection->getName().c_str(), name_.c_str()); incoming_connections_Iter = this->_incomingConnections.begin(); ret = true; } @@ -122,9 +118,7 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) { existedConnection.insert(connection); connection->setSource(shared_from_this()); out_going_connections_[relationship] = existedConnection; - logger_->log_info( - "Add connection %s into Processor %s outgoing connection for relationship %s", - connection->getName().c_str(), name_.c_str(), relationship.c_str()); + logger_->log_info("Add connection %s into Processor %s outgoing connection for relationship %s", connection->getName().c_str(), name_.c_str(), relationship.c_str()); ret = true; } } else { @@ -133,9 +127,7 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) { newConnection.insert(connection); connection->setSource(shared_from_this()); out_going_connections_[relationship] = newConnection; - logger_->log_info( - "Add connection %s into Processor %s outgoing connection for relationship %s", - connection->getName().c_str(), name_.c_str(), relationship.c_str()); + logger_->log_info("Add connection %s into Processor %s outgoing connection for relationship %s", connection->getName().c_str(), name_.c_str(), relationship.c_str()); ret = true; } } @@ -145,9 +137,7 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) { void Processor::removeConnection(std::shared_ptr<Connectable> conn) { if (isRunning()) { - logger_->log_info( - "Can not remove connection while the process %s is running", - name_.c_str()); + logger_->log_info("Can not remove connection while the process %s is running", name_.c_str()); return; } @@ -156,8 +146,7 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) { uuid_t srcUUID; uuid_t destUUID; - std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>( - conn); + std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn); connection->getSourceUUID(srcUUID); connection->getDestinationUUID(destUUID); @@ -167,9 +156,7 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) { if (_incomingConnections.find(connection) != _incomingConnections.end()) { _incomingConnections.erase(connection); connection->setDestination(NULL); - logger_->log_info( - "Remove connection %s into Processor %s incoming connection", - connection->getName().c_str(), name_.c_str()); + logger_->log_info("Remove connection %s into Processor %s incoming connection", connection->getName().c_str(), name_.c_str()); incoming_connections_Iter = this->_incomingConnections.begin(); } } @@ -181,13 +168,10 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) { if (it == out_going_connections_.end()) { return; } else { - if (out_going_connections_[relationship].find(connection) - != out_going_connections_[relationship].end()) { + if (out_going_connections_[relationship].find(connection) != out_going_connections_[relationship].end()) { out_going_connections_[relationship].erase(connection); connection->setSource(NULL); - logger_->log_info( - "Remove connection %s into Processor %s outgoing connection for relationship %s", - connection->getName().c_str(), name_.c_str(), relationship.c_str()); + logger_->log_info("Remove connection %s into Processor %s outgoing connection for relationship %s", connection->getName().c_str(), name_.c_str(), relationship.c_str()); } } } @@ -200,8 +184,7 @@ bool Processor::flowFilesQueued() { return false; for (auto &&conn : _incomingConnections) { - std::shared_ptr<Connection> connection = - std::static_pointer_cast<Connection>(conn); + std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn); if (connection->getQueueSize() > 0) return true; } @@ -216,8 +199,7 @@ bool Processor::flowFilesOutGoingFull() { // We already has connection for this relationship std::set<std::shared_ptr<Connectable>> existedConnection = connection.second; for (const auto conn : existedConnection) { - std::shared_ptr<Connection> connection = std::static_pointer_cast< - Connection>(conn); + std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn); if (connection->isFull()) return true; } @@ -226,8 +208,7 @@ bool Processor::flowFilesOutGoingFull() { return false; } -void Processor::onTrigger(ProcessContext *context, - ProcessSessionFactory *sessionFactory) { +void Processor::onTrigger(ProcessContext *context, ProcessSessionFactory *sessionFactory) { auto session = sessionFactory->createSession(); try { @@ -251,17 +232,15 @@ bool Processor::isWorkAvailable() { try { for (const auto &conn : _incomingConnections) { - std::shared_ptr<Connection> connection = std::static_pointer_cast< - Connection>(conn); + std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn); if (connection->getQueueSize() > 0) { hasWork = true; break; } } } catch (...) { - logger_->log_error( - "Caught an exception while checking if work is available;" - " unless it was positively determined that work is available, assuming NO work is available!"); + logger_->log_error("Caught an exception while checking if work is available;" + " unless it was positively determined that work is available, assuming NO work is available!"); } return hasWork; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/ProcessorNode.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ProcessorNode.cpp b/libminifi/src/core/ProcessorNode.cpp index bf39738..05f31a0 100644 --- a/libminifi/src/core/ProcessorNode.cpp +++ b/libminifi/src/core/ProcessorNode.cpp @@ -25,7 +25,8 @@ namespace core { ProcessorNode::ProcessorNode(const std::shared_ptr<Connectable> processor) : processor_(processor), - Connectable(processor->getName(), 0), ConfigurableComponent() { + Connectable(processor->getName(), 0), + ConfigurableComponent() { uuid_t copy; processor->getUUID(copy); setUUID(copy); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/RepositoryFactory.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/RepositoryFactory.cpp b/libminifi/src/core/RepositoryFactory.cpp index 45ad980..cf18601 100644 --- a/libminifi/src/core/RepositoryFactory.cpp +++ b/libminifi/src/core/RepositoryFactory.cpp @@ -41,12 +41,10 @@ namespace core { class FlowFileRepository; #endif -std::shared_ptr<core::Repository> createRepository( - const std::string configuration_class_name, bool fail_safe, const std::string repo_name) { +std::shared_ptr<core::Repository> createRepository(const std::string configuration_class_name, bool fail_safe, const std::string repo_name) { std::shared_ptr<core::Repository> return_obj = nullptr; std::string class_name_lc = configuration_class_name; - std::transform(class_name_lc.begin(), class_name_lc.end(), - class_name_lc.begin(), ::tolower); + std::transform(class_name_lc.begin(), class_name_lc.end(), class_name_lc.begin(), ::tolower); try { std::shared_ptr<core::Repository> return_obj = nullptr; if (class_name_lc == "flowfilerepository") { @@ -65,21 +63,17 @@ std::shared_ptr<core::Repository> createRepository( return return_obj; } if (fail_safe) { - return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, 1, - 1); + return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, 1, 1); } else { - throw std::runtime_error( - "Support for the provided configuration class could not be found"); + throw std::runtime_error("Support for the provided configuration class could not be found"); } } catch (const std::runtime_error &r) { if (fail_safe) { - return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, 1, - 1); + return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, 1, 1); } } - throw std::runtime_error( - "Support for the provided configuration class could not be found"); + throw std::runtime_error("Support for the provided configuration class could not be found"); } } /* namespace core */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/controller/ControllerServiceNode.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/controller/ControllerServiceNode.cpp b/libminifi/src/core/controller/ControllerServiceNode.cpp index 12e3653..3097574 100644 --- a/libminifi/src/core/controller/ControllerServiceNode.cpp +++ b/libminifi/src/core/controller/ControllerServiceNode.cpp @@ -39,8 +39,6 @@ std::vector<std::shared_ptr<ConfigurableComponent> > &ControllerServiceNode::get return linked_components_; } - - } /* namespace controller */ } /* namespace core */ } /* namespace minifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/controller/ControllerServiceProvider.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/controller/ControllerServiceProvider.cpp b/libminifi/src/core/controller/ControllerServiceProvider.cpp index da5c6a1..942c299 100644 --- a/libminifi/src/core/controller/ControllerServiceProvider.cpp +++ b/libminifi/src/core/controller/ControllerServiceProvider.cpp @@ -32,8 +32,7 @@ namespace controller { * @return the ControllerService that is registered with the given * identifier */ -std::shared_ptr<ControllerService> ControllerServiceProvider::getControllerService( - const std::string &identifier) { +std::shared_ptr<ControllerService> ControllerServiceProvider::getControllerService(const std::string &identifier) { auto service = controller_map_->getControllerServiceNode(identifier); if (service != nullptr) { return service->getControllerServiceImplementation(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/controller/StandardControllerServiceNode.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/controller/StandardControllerServiceNode.cpp b/libminifi/src/core/controller/StandardControllerServiceNode.cpp index 26804f6..5c4aa70 100644 --- a/libminifi/src/core/controller/StandardControllerServiceNode.cpp +++ b/libminifi/src/core/controller/StandardControllerServiceNode.cpp @@ -31,8 +31,7 @@ std::shared_ptr<core::ProcessGroup> &StandardControllerServiceNode::getProcessGr return process_group_; } -void StandardControllerServiceNode::setProcessGroup( - std::shared_ptr<ProcessGroup> &processGroup) { +void StandardControllerServiceNode::setProcessGroup(std::shared_ptr<ProcessGroup> &processGroup) { std::lock_guard<std::mutex> lock(mutex_); process_group_ = processGroup; } @@ -44,8 +43,7 @@ bool StandardControllerServiceNode::enable() { if (getProperty(property.getName(), property)) { active = true; for (auto linked_service : property.getValues()) { - std::shared_ptr<ControllerServiceNode> csNode = provider - ->getControllerServiceNode(linked_service); + std::shared_ptr<ControllerServiceNode> csNode = provider->getControllerServiceNode(linked_service); if (nullptr != csNode) { std::lock_guard<std::mutex> lock(mutex_); linked_controller_services_.push_back(csNode); @@ -53,8 +51,7 @@ bool StandardControllerServiceNode::enable() { } } else { } - std::shared_ptr<ControllerService> impl = - getControllerServiceImplementation(); + std::shared_ptr<ControllerService> impl = getControllerServiceImplementation(); if (nullptr != impl) { impl->onEnable(); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/logging/LoggerConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/logging/LoggerConfiguration.cpp b/libminifi/src/core/logging/LoggerConfiguration.cpp index 4bb6615..89e6a1b 100644 --- a/libminifi/src/core/logging/LoggerConfiguration.cpp +++ b/libminifi/src/core/logging/LoggerConfiguration.cpp @@ -42,25 +42,26 @@ namespace logging { const char* LoggerConfiguration::spdlog_default_pattern = "[%Y-%m-%d %H:%M:%S.%e] [%n] [%l] %v"; -std::vector< std::string > LoggerProperties::get_keys_of_type(const std::string &type) { +std::vector<std::string> LoggerProperties::get_keys_of_type(const std::string &type) { std::vector<std::string> appenders; std::string prefix = type + "."; for (auto const & entry : properties_) { - if (entry.first.rfind(prefix, 0) == 0 && - entry.first.find(".", prefix.length() + 1) == std::string::npos) { + if (entry.first.rfind(prefix, 0) == 0 && entry.first.find(".", prefix.length() + 1) == std::string::npos) { appenders.push_back(entry.first); } } return appenders; } -LoggerConfiguration::LoggerConfiguration() : root_namespace_(create_default_root()), loggers(std::vector<std::shared_ptr<LoggerImpl>>()), - formatter_(std::make_shared<spdlog::pattern_formatter>(spdlog_default_pattern)) { +LoggerConfiguration::LoggerConfiguration() + : root_namespace_(create_default_root()), + loggers(std::vector<std::shared_ptr<LoggerImpl>>()), + formatter_(std::make_shared<spdlog::pattern_formatter>(spdlog_default_pattern)) { logger_ = std::shared_ptr<LoggerImpl>(new LoggerImpl(core::getClassName<LoggerConfiguration>(), get_logger(nullptr, root_namespace_, core::getClassName<LoggerConfiguration>(), formatter_))); loggers.push_back(logger_); } -void LoggerConfiguration::initialize(const std::shared_ptr<LoggerProperties> &logger_properties) { +void LoggerConfiguration::initialize(const std::shared_ptr<LoggerProperties> &logger_properties) { std::lock_guard<std::mutex> lock(mutex); root_namespace_ = initialize_namespaces(logger_properties); std::string spdlog_pattern; @@ -90,8 +91,7 @@ std::shared_ptr<Logger> LoggerConfiguration::getLogger(const std::string &name) return result; } -std::shared_ptr<internal::LoggerNamespace> LoggerConfiguration:: - initialize_namespaces(const std::shared_ptr<LoggerProperties> &logger_properties) { +std::shared_ptr<internal::LoggerNamespace> LoggerConfiguration::initialize_namespaces(const std::shared_ptr<LoggerProperties> &logger_properties) { std::map<std::string, std::shared_ptr<spdlog::sinks::sink>> sink_map = logger_properties->initial_sinks(); std::string appender_type = "appender"; @@ -117,16 +117,18 @@ std::shared_ptr<internal::LoggerNamespace> LoggerConfiguration:: try { max_files = std::stoi(max_files_str); } catch (const std::invalid_argument &ia) { - } catch (const std::out_of_range &oor) {} + } catch (const std::out_of_range &oor) { + } } - int max_file_size = 5*1024*1024; + int max_file_size = 5 * 1024 * 1024; std::string max_file_size_str = ""; if (logger_properties->get(appender_key + ".max_file_size", max_file_size_str)) { try { max_file_size = std::stoi(max_file_size_str); } catch (const std::invalid_argument &ia) { - } catch (const std::out_of_range &oor) {} + } catch (const std::out_of_range &oor) { + } } sink_map[appender_name] = std::make_shared<spdlog::sinks::rotating_file_sink_mt>(file_name, max_file_size, max_files); } else if ("stdout" == appender_type) { @@ -170,8 +172,7 @@ std::shared_ptr<internal::LoggerNamespace> LoggerConfiguration:: } std::shared_ptr<internal::LoggerNamespace> current_namespace = root_namespace; if (logger_key != "logger.root") { - for (auto const & name : utils::StringUtils::split(logger_key.substr(logger_type.length() + 1, - logger_key.length() - logger_type.length()), "::")) { + for (auto const & name : utils::StringUtils::split(logger_key.substr(logger_type.length() + 1, logger_key.length() - logger_type.length()), "::")) { auto child_pair = current_namespace->children.find(name); std::shared_ptr<internal::LoggerNamespace> child; if (child_pair == current_namespace->children.end()) { @@ -190,8 +191,8 @@ std::shared_ptr<internal::LoggerNamespace> LoggerConfiguration:: return root_namespace; } -std::shared_ptr<spdlog::logger> LoggerConfiguration::get_logger(std::shared_ptr<Logger> logger, const std::shared_ptr<internal::LoggerNamespace> &root_namespace, - const std::string &name, std::shared_ptr<spdlog::formatter> formatter, bool remove_if_present) { +std::shared_ptr<spdlog::logger> LoggerConfiguration::get_logger(std::shared_ptr<Logger> logger, const std::shared_ptr<internal::LoggerNamespace> &root_namespace, const std::string &name, + std::shared_ptr<spdlog::formatter> formatter, bool remove_if_present) { std::shared_ptr<spdlog::logger> spdlogger = spdlog::get(name); if (spdlogger) { if (remove_if_present) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp index 3d21683..e46f740 100644 --- a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp +++ b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp @@ -50,11 +50,8 @@ const char *SiteToSiteProvenanceReportingTask::ProvenanceAppStr = "MiNiFi Flow"; void SiteToSiteProvenanceReportingTask::initialize() { } -void SiteToSiteProvenanceReportingTask::getJsonReport( - core::ProcessContext *context, core::ProcessSession *session, - std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> &records, - std::string &report) { - +void SiteToSiteProvenanceReportingTask::getJsonReport(core::ProcessContext *context, core::ProcessSession *session, std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> &records, + std::string &report) { Json::Value array; for (auto record : records) { Json::Value recordJson; @@ -62,9 +59,7 @@ void SiteToSiteProvenanceReportingTask::getJsonReport( Json::Value parentUuidJson; Json::Value childUuidJson; recordJson["eventId"] = record->getEventId().c_str(); - recordJson["eventType"] = - provenance::ProvenanceEventRecord::ProvenanceEventTypeStr[record - ->getEventType()]; + recordJson["eventType"] = provenance::ProvenanceEventRecord::ProvenanceEventTypeStr[record->getEventType()]; recordJson["timestampMillis"] = record->getEventTime(); recordJson["durationMillis"] = record->getEventDuration(); recordJson["lineageStart"] = record->getlineageStartDate(); @@ -91,10 +86,8 @@ void SiteToSiteProvenanceReportingTask::getJsonReport( } recordJson["childIds"] = childUuidJson; recordJson["transitUri"] = record->getTransitUri().c_str(); - recordJson["remoteIdentifier"] = record->getSourceSystemFlowFileIdentifier() - .c_str(); - recordJson["alternateIdentifier"] = record->getAlternateIdentifierUri() - .c_str(); + recordJson["remoteIdentifier"] = record->getSourceSystemFlowFileIdentifier().c_str(); + recordJson["alternateIdentifier"] = record->getAlternateIdentifierUri().c_str(); recordJson["application"] = ProvenanceAppStr; array.append(recordJson); } @@ -103,14 +96,10 @@ void SiteToSiteProvenanceReportingTask::getJsonReport( report = writer.write(array); } -void SiteToSiteProvenanceReportingTask::onSchedule( - core::ProcessContext *context, - core::ProcessSessionFactory *sessionFactory) { +void SiteToSiteProvenanceReportingTask::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { } -void SiteToSiteProvenanceReportingTask::onTrigger( - core::ProcessContext *context, core::ProcessSession *session) { - +void SiteToSiteProvenanceReportingTask::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { std::unique_ptr<Site2SiteClientProtocol> protocol_ = getNextProtocol(true); if (!protocol_) { @@ -121,18 +110,14 @@ void SiteToSiteProvenanceReportingTask::onTrigger( if (!protocol_->bootstrap()) { // bootstrap the client protocol if needeed context->yield(); - std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>( - context->getProcessorNode().getProcessor()); - logger_->log_error("Site2Site bootstrap failed yield period %d peer ", - processor->getYieldPeriodMsec()); + std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>(context->getProcessorNode().getProcessor()); + logger_->log_error("Site2Site bootstrap failed yield period %d peer ", processor->getYieldPeriodMsec()); returnProtocol(std::move(protocol_)); return; } std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> records; - std::shared_ptr<provenance::ProvenanceRepository> repo = - std::static_pointer_cast<provenance::ProvenanceRepository>( - context->getProvenanceRepository()); + std::shared_ptr<provenance::ProvenanceRepository> repo = std::static_pointer_cast<provenance::ProvenanceRepository>(context->getProvenanceRepository()); repo->getProvenanceRecord(records, batch_size_); if (records.size() <= 0) { returnProtocol(std::move(protocol_)); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/repository/FlowFileRepository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/repository/FlowFileRepository.cpp b/libminifi/src/core/repository/FlowFileRepository.cpp index 5f62f83..e6d561a 100644 --- a/libminifi/src/core/repository/FlowFileRepository.cpp +++ b/libminifi/src/core/repository/FlowFileRepository.cpp @@ -40,24 +40,19 @@ void FlowFileRepository::run() { leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions()); for (it->SeekToFirst(); it->Valid(); it->Next()) { - std::shared_ptr<FlowFileRecord> eventRead = std::make_shared< - FlowFileRecord>(shared_from_this()); + std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this()); std::string key = it->key().ToString(); - if (eventRead->DeSerialize( - reinterpret_cast<const uint8_t *>(it->value().data()), - it->value().size())) { + if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(it->value().data()), it->value().size())) { if ((curTime - eventRead->getEventTime()) > max_partition_millis_) purgeList.push_back(key); } else { - logger_->log_debug("NiFi %s retrieve event %s fail", name_.c_str(), - key.c_str()); + logger_->log_debug("NiFi %s retrieve event %s fail", name_.c_str(), key.c_str()); purgeList.push_back(key); } } delete it; for (auto eventId : purgeList) { - logger_->log_info("Repository Repo %s Purge %s", name_.c_str(), - eventId.c_str()); + logger_->log_info("Repository Repo %s Purge %s", name_.c_str(), eventId.c_str()); Delete(eventId); } } @@ -74,19 +69,14 @@ void FlowFileRepository::loadComponent() { leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions()); for (it->SeekToFirst(); it->Valid(); it->Next()) { - std::shared_ptr<FlowFileRecord> eventRead = - std::make_shared<FlowFileRecord>(shared_from_this()); + std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this()); std::string key = it->key().ToString(); - if (eventRead->DeSerialize( - reinterpret_cast<const uint8_t *>(it->value().data()), - it->value().size())) { + if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(it->value().data()), it->value().size())) { auto search = connectionMap.find(eventRead->getConnectionUuid()); if (search != connectionMap.end()) { // we find the connection for the persistent flowfile, create the flowfile and enqueue that - std::shared_ptr<core::FlowFile> flow_file_ref = - std::static_pointer_cast<core::FlowFile>(eventRead); - std::shared_ptr<FlowFileRecord> record = - std::make_shared<FlowFileRecord>(shared_from_this(), flow_file_ref); + std::shared_ptr<core::FlowFile> flow_file_ref = std::static_pointer_cast<core::FlowFile>(eventRead); + std::shared_ptr<FlowFileRecord> record = std::make_shared<FlowFileRecord>(shared_from_this(), flow_file_ref); // set store to repo to true so that we do need to persistent again in enqueue record->setStoredToRepository(true); search->second->put(record); @@ -105,8 +95,7 @@ void FlowFileRepository::loadComponent() { std::vector<std::string>::iterator itPurge; for (itPurge = purgeList.begin(); itPurge != purgeList.end(); itPurge++) { std::string eventId = *itPurge; - logger_->log_info("Repository Repo %s Purge %s", name_.c_str(), - eventId.c_str()); + logger_->log_info("Repository Repo %s Purge %s", name_.c_str(), eventId.c_str()); Delete(eventId); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/repository/VolatileRepository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/repository/VolatileRepository.cpp b/libminifi/src/core/repository/VolatileRepository.cpp index db036f8..a7e3a51 100644 --- a/libminifi/src/core/repository/VolatileRepository.cpp +++ b/libminifi/src/core/repository/VolatileRepository.cpp @@ -28,8 +28,7 @@ namespace minifi { namespace core { namespace repository { -const char *VolatileRepository::volatile_repo_max_count = - "max.count"; +const char *VolatileRepository::volatile_repo_max_count = "max.count"; void VolatileRepository::run() { repo_full_ = false; @@ -45,9 +44,7 @@ void VolatileRepository::purge() { RepoValue value; if (ent->getValue(value)) { current_size_ -= value.size(); - logger_->log_info("VolatileRepository -- purge %s %d %d %d", - value.getKey(), current_size_.load(), max_size_, - current_index_.load()); + logger_->log_info("VolatileRepository -- purge %s %d %d %d", value.getKey(), current_size_.load(), max_size_, current_index_.load()); } if (current_size_ < max_size_) break;
