http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/ConfigurableComponent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ConfigurableComponent.cpp b/libminifi/src/core/ConfigurableComponent.cpp new file mode 100644 index 0000000..e5703d1 --- /dev/null +++ b/libminifi/src/core/ConfigurableComponent.cpp @@ -0,0 +1,143 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "core/ConfigurableComponent.h" + +#include "core/Property.h" +#include "core/logging/Logger.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +ConfigurableComponent::ConfigurableComponent(std::shared_ptr<logging::Logger> logger) + : logger_(logger) { + +} + +ConfigurableComponent::ConfigurableComponent( + const ConfigurableComponent &&other) + : properties_(std::move(other.properties_)), + logger_(std::move(other.logger_)) { + +} +ConfigurableComponent::~ConfigurableComponent() { + +} + +/** + * Get property using the provided name. + * @param name property name. + * @param value value passed in by reference + * @return result of getting property. + */ +bool ConfigurableComponent::getProperty(const std::string name, + std::string &value) { + std::lock_guard<std::mutex> lock(configuration_mutex_); + + auto &&it = properties_.find(name); + + if (it != properties_.end()) { + Property item = it->second; + value = item.getValue(); + logger_->log_info("Processor %s property name %s value %s", name.c_str(), + item.getName().c_str(), value.c_str()); + return true; + } else { + return false; + } +} +/** + * Sets the property using the provided name + * @param property name + * @param value property value. + * @return result of setting property. + */ +bool ConfigurableComponent::setProperty(const std::string name, + std::string value) { + std::lock_guard<std::mutex> lock(configuration_mutex_); + auto &&it = properties_.find(name); + + if (it != properties_.end()) { + Property item = it->second; + item.setValue(value); + properties_[item.getName()] = item; + logger_->log_info("Component %s property name %s value %s", name.c_str(), + item.getName().c_str(), value.c_str()); + return true; + } else { + return false; + } +} + +/** + * Sets the property using the provided name + * @param property name + * @param value property value. + * @return whether property was set or not + */ +bool ConfigurableComponent::setProperty(Property &prop, std::string value) { + std::lock_guard<std::mutex> lock(configuration_mutex_); + auto it = properties_.find(prop.getName()); + + if (it != properties_.end()) { + Property item = it->second; + item.setValue(value); + properties_[item.getName()] = item; + logger_->log_info("property name %s value %s", prop.getName().c_str(), + item.getName().c_str(), value.c_str()); + return true; + } else { + Property newProp(prop); + newProp.setValue(value); + properties_.insert( + std::pair<std::string, Property>(prop.getName(), newProp)); + return true; + + } + return false; +} + +/** + * Sets supported properties for the ConfigurableComponent + * @param supported properties + * @return result of set operation. + */ +bool ConfigurableComponent::setSupportedProperties( + std::set<Property> properties) { + if (!canEdit()) { + return false; + } + + std::lock_guard<std::mutex> lock(configuration_mutex_); + + properties_.clear(); + for (auto item : properties) { + properties_[item.getName()] = item; + } + + return true; +} + +} /* namespace components */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/ConfigurationFactory.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ConfigurationFactory.cpp b/libminifi/src/core/ConfigurationFactory.cpp new file mode 100644 index 0000000..52bde69 --- /dev/null +++ b/libminifi/src/core/ConfigurationFactory.cpp @@ -0,0 +1,81 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "core/ConfigurationFactory.h" +#include "core/FlowConfiguration.h" +#include <type_traits> +#ifdef YAML_SUPPORT +#include "core/yaml/YamlConfiguration.h" +#endif + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +#ifndef YAML_SUPPORT + class YamlConfiguration; +#endif + + std::unique_ptr<core::FlowConfiguration> createFlowConfiguration( + std::shared_ptr<core::Repository> repo, + std::shared_ptr<core::Repository> flow_file_repo, + const std::string configuration_class_name, const std::string path, + bool fail_safe) { + + std::string class_name_lc = configuration_class_name; + std::transform(class_name_lc.begin(), class_name_lc.end(), + class_name_lc.begin(), ::tolower); + try { + + if (class_name_lc == "flowconfiguration") { + // load the base configuration. + return std::unique_ptr<core::FlowConfiguration>( + new core::FlowConfiguration(repo, flow_file_repo, path)); + + } else if (class_name_lc == "yamlconfiguration") { + // only load if the class is defined. + return std::unique_ptr<core::FlowConfiguration>(instantiate<core::YamlConfiguration>(repo, flow_file_repo, path)); + + + } else { + if (fail_safe) { + return std::unique_ptr<core::FlowConfiguration>( + new core::FlowConfiguration(repo, flow_file_repo, path)); + } else { + throw std::runtime_error( + "Support for the provided configuration class could not be found"); + } + } + } catch (const std::runtime_error &r) { + if (fail_safe) { + return std::unique_ptr<core::FlowConfiguration>( + new core::FlowConfiguration(repo, flow_file_repo, path)); + } + } + + throw std::runtime_error( + "Support for the provided configuration class could not be found"); + } + +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/Connectable.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Connectable.cpp b/libminifi/src/core/Connectable.cpp new file mode 100644 index 0000000..ac61568 --- /dev/null +++ b/libminifi/src/core/Connectable.cpp @@ -0,0 +1,174 @@ +/* + * Connectable.cpp + * + * Created on: Feb 27, 2017 + * Author: mparisi + */ + +#include "../../include/core/Connectable.h" + +#include <uuid/uuid.h> +#include "core/logging/Logger.h" +#include "core/Relationship.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +Connectable::Connectable(std::string name, uuid_t uuid) + : CoreComponent(name, uuid), + max_concurrent_tasks_(1) { + +} + +Connectable::Connectable(const Connectable &&other) + : CoreComponent(std::move(other)), + max_concurrent_tasks_(std::move(other.max_concurrent_tasks_)) { + has_work_ = other.has_work_.load(); + strategy_ = other.strategy_.load(); +} + +Connectable::~Connectable() { + +} + +bool Connectable::setSupportedRelationships( + std::set<core::Relationship> relationships) { + if (isRunning()) { + logger_->log_info( + "Can not set processor supported relationship while the process %s is running", + name_.c_str()); + return false; + } + + std::lock_guard<std::mutex> lock(relationship_mutex_); + + relationships_.clear(); + for (auto item : relationships) { + relationships_[item.getName()] = item; + logger_->log_info("Processor %s supported relationship name %s", + name_.c_str(), item.getName().c_str()); + } + + return true; +} + +// Whether the relationship is supported +bool Connectable::isSupportedRelationship(core::Relationship relationship) { + const bool requiresLock = isRunning(); + + const auto conditionalLock = + !requiresLock ? + std::unique_lock<std::mutex>() : + std::unique_lock<std::mutex>(relationship_mutex_); + + const auto &it = relationships_.find(relationship.getName()); + if (it != relationships_.end()) { + return true; + } else { + return false; + } +} + +bool Connectable::setAutoTerminatedRelationships( + std::set<Relationship> relationships) { + if (isRunning()) { + logger_->log_info( + "Can not set processor auto terminated relationship while the process %s is running", + name_.c_str()); + return false; + } + + std::lock_guard<std::mutex> lock(relationship_mutex_); + + auto_terminated_relationships_.clear(); + for (auto item : relationships) { + auto_terminated_relationships_[item.getName()] = item; + logger_->log_info("Processor %s auto terminated relationship name %s", + name_.c_str(), item.getName().c_str()); + } + + return true; +} + +// Check whether the relationship is auto terminated +bool Connectable::isAutoTerminated(core::Relationship relationship) { + const bool requiresLock = isRunning(); + + const auto conditionalLock = + !requiresLock ? + std::unique_lock<std::mutex>() : + std::unique_lock<std::mutex>(relationship_mutex_); + + const auto &it = auto_terminated_relationships_.find(relationship.getName()); + if (it != auto_terminated_relationships_.end()) { + return true; + } else { + return false; + } +} + +void Connectable::waitForWork(uint64_t timeoutMs) { + has_work_.store(isWorkAvailable()); + + if (!has_work_.load()) { + std::unique_lock<std::mutex> lock(work_available_mutex_); + work_condition_.wait_for(lock, std::chrono::milliseconds(timeoutMs), + [&] {return has_work_.load();}); + } + +} + +void Connectable::notifyWork() { + // Do nothing if we are not event-driven + if (strategy_ != EVENT_DRIVEN) { + return; + } + + { + has_work_.store(isWorkAvailable()); + + if (has_work_.load()) { + work_condition_.notify_one(); + } + } + +} + +std::set<std::shared_ptr<Connectable>> Connectable::getOutGoingConnections( + std::string relationship) { + std::set<std::shared_ptr<Connectable>> empty; + + auto &&it = _outGoingConnections.find(relationship); + if (it != _outGoingConnections.end()) { + return _outGoingConnections[relationship]; + } else { + return empty; + } +} + +std::shared_ptr<Connectable> Connectable::getNextIncomingConnection() { + std::lock_guard<std::mutex> lock(relationship_mutex_); + + if (_incomingConnections.size() == 0) + return NULL; + + if (incoming_connections_Iter == _incomingConnections.end()) + incoming_connections_Iter = _incomingConnections.begin(); + + std::shared_ptr<Connectable> ret = *incoming_connections_Iter; + incoming_connections_Iter++; + + if (incoming_connections_Iter == _incomingConnections.end()) + incoming_connections_Iter = _incomingConnections.begin(); + + return ret; +} + +} /* namespace components */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/Core.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Core.cpp b/libminifi/src/core/Core.cpp new file mode 100644 index 0000000..39969f6 --- /dev/null +++ b/libminifi/src/core/Core.cpp @@ -0,0 +1,51 @@ +/* + * Core.cpp + * + * Created on: Mar 10, 2017 + * Author: mparisi + */ + +#include "core/core.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +// Set UUID +void CoreComponent::setUUID(uuid_t uuid) { + uuid_copy(uuid_, uuid); + char uuidStr[37]; + uuid_unparse_lower(uuid_, uuidStr); + uuidStr_ = uuidStr; +} +// Get UUID +bool CoreComponent::getUUID(uuid_t uuid) { + if (uuid) { + uuid_copy(uuid, uuid_); + return true; + } else { + return false; + } +} + +// Get UUID +unsigned const char *CoreComponent::getUUID() { + return uuid_; +} + +// Set Processor Name +void CoreComponent::setName(const std::string name) { + name_ = name; + +} +// Get Process Name +std::string CoreComponent::getName() { + return name_; +} +} +} +} +} +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/FlowConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp new file mode 100644 index 0000000..c6472cc --- /dev/null +++ b/libminifi/src/core/FlowConfiguration.cpp @@ -0,0 +1,110 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "core/FlowConfiguration.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +FlowConfiguration::~FlowConfiguration() { + +} + +std::shared_ptr<core::Processor> FlowConfiguration::createProcessor( + std::string name, uuid_t uuid) { + std::shared_ptr<core::Processor> processor = nullptr; + if (name + == org::apache::nifi::minifi::processors::GenerateFlowFile::ProcessorName) { + processor = std::make_shared< + org::apache::nifi::minifi::processors::GenerateFlowFile>(name, uuid); + } else if (name + == org::apache::nifi::minifi::processors::LogAttribute::ProcessorName) { + processor = std::make_shared< + org::apache::nifi::minifi::processors::LogAttribute>(name, uuid); + } else if (name + == org::apache::nifi::minifi::processors::RealTimeDataCollector::ProcessorName) { + processor = std::make_shared< + org::apache::nifi::minifi::processors::RealTimeDataCollector>(name, + uuid); + } else if (name + == org::apache::nifi::minifi::processors::GetFile::ProcessorName) { + processor = + std::make_shared<org::apache::nifi::minifi::processors::GetFile>(name, + uuid); + } else if (name + == org::apache::nifi::minifi::processors::PutFile::ProcessorName) { + processor = + std::make_shared<org::apache::nifi::minifi::processors::PutFile>(name, + uuid); + } else if (name + == org::apache::nifi::minifi::processors::TailFile::ProcessorName) { + processor = + std::make_shared<org::apache::nifi::minifi::processors::TailFile>(name, + uuid); + } else if (name + == org::apache::nifi::minifi::processors::ListenSyslog::ProcessorName) { + processor = std::make_shared< + org::apache::nifi::minifi::processors::ListenSyslog>(name, uuid); + } else if (name + == org::apache::nifi::minifi::processors::ListenHTTP::ProcessorName) { + processor = std::make_shared< + org::apache::nifi::minifi::processors::ListenHTTP>(name, uuid); + } else if (name + == org::apache::nifi::minifi::processors::ExecuteProcess::ProcessorName) { + processor = std::make_shared< + org::apache::nifi::minifi::processors::ExecuteProcess>(name, uuid); + } else if (name + == org::apache::nifi::minifi::processors::AppendHostInfo::ProcessorName) { + processor = std::make_shared< + org::apache::nifi::minifi::processors::AppendHostInfo>(name, uuid); + } else { + logger_->log_error("No Processor defined for %s", name.c_str()); + return nullptr; + } + + // initialize the processor + processor->initialize(); + + return processor; +} + +std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRootProcessGroup( + std::string name, uuid_t uuid) { + return std::unique_ptr<core::ProcessGroup>( + new core::ProcessGroup(core::ROOT_PROCESS_GROUP, name, uuid)); +} + +std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRemoteProcessGroup( + std::string name, uuid_t uuid) { + return std::unique_ptr<core::ProcessGroup>( + new core::ProcessGroup(core::REMOTE_PROCESS_GROUP, name, uuid)); +} + +std::shared_ptr<minifi::Connection> FlowConfiguration::createConnection( + std::string name, uuid_t uuid) { + return std::make_shared<minifi::Connection>(flow_file_repo_, name, uuid); +} + +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/ProcessGroup.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp new file mode 100644 index 0000000..baa3ebd --- /dev/null +++ b/libminifi/src/core/ProcessGroup.cpp @@ -0,0 +1,312 @@ +/** + * @file ProcessGroup.cpp + * ProcessGroup class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vector> +#include <queue> +#include <map> +#include <set> +#include <sys/time.h> +#include <time.h> +#include <chrono> +#include <thread> + +#include "core/ProcessGroup.h" +#include "core/Processor.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid, + ProcessGroup *parent) + : name_(name), + type_(type), + parent_process_group_(parent) { + if (!uuid) + // Generate the global UUID for the flow record + uuid_generate(uuid_); + else + uuid_copy(uuid_, uuid); + + yield_period_msec_ = 0; + transmitting_ = false; + + logger_ = logging::Logger::getLogger(); + logger_->log_info("ProcessGroup %s created", name_.c_str()); +} + +ProcessGroup::~ProcessGroup() { + for (auto &&connection : connections_) { + connection->drain(); + } + + for (std::set<ProcessGroup *>::iterator it = child_process_groups_.begin(); + it != child_process_groups_.end(); ++it) { + ProcessGroup *processGroup(*it); + delete processGroup; + } + +} + +bool ProcessGroup::isRootProcessGroup() { + std::lock_guard<std::mutex> lock(mutex_); + return (type_ == ROOT_PROCESS_GROUP); +} + +void ProcessGroup::addProcessor(std::shared_ptr<Processor> processor) { + std::lock_guard<std::mutex> lock(mutex_); + + if (processors_.find(processor) == processors_.end()) { + // We do not have the same processor in this process group yet + processors_.insert(processor); + logger_->log_info("Add processor %s into process group %s", + processor->getName().c_str(), name_.c_str()); + } +} + +void ProcessGroup::removeProcessor(std::shared_ptr<Processor> processor) { + std::lock_guard<std::mutex> lock(mutex_); + + if (processors_.find(processor) != processors_.end()) { + // We do have the same processor in this process group yet + processors_.erase(processor); + logger_->log_info("Remove processor %s from process group %s", + processor->getName().c_str(), name_.c_str()); + } +} + +void ProcessGroup::addProcessGroup(ProcessGroup *child) { + std::lock_guard<std::mutex> lock(mutex_); + + if (child_process_groups_.find(child) == child_process_groups_.end()) { + // We do not have the same child process group in this process group yet + child_process_groups_.insert(child); + logger_->log_info("Add child process group %s into process group %s", + child->getName().c_str(), name_.c_str()); + } +} + +void ProcessGroup::removeProcessGroup(ProcessGroup *child) { + std::lock_guard<std::mutex> lock(mutex_); + + if (child_process_groups_.find(child) != child_process_groups_.end()) { + // We do have the same child process group in this process group yet + child_process_groups_.erase(child); + logger_->log_info("Remove child process group %s from process group %s", + child->getName().c_str(), name_.c_str()); + } +} + +void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler, + EventDrivenSchedulingAgent *eventScheduler) { + std::lock_guard<std::mutex> lock(mutex_); + + try { + // Start all the processor node, input and output ports + for (auto processor : processors_) { + logger_->log_debug("Starting %s", processor->getName().c_str()); + + if (!processor->isRunning() + && processor->getScheduledState() != DISABLED) { + if (processor->getSchedulingStrategy() == TIMER_DRIVEN) + timeScheduler->schedule(processor); + else if (processor->getSchedulingStrategy() == EVENT_DRIVEN) + eventScheduler->schedule(processor); + } + } + // Start processing the group + for (auto processGroup : child_process_groups_) { + processGroup->startProcessing(timeScheduler, eventScheduler); + } + } catch (std::exception &exception) { + logger_->log_debug("Caught Exception %s", exception.what()); + throw; + } catch (...) { + logger_->log_debug( + "Caught Exception during process group start processing"); + throw; + } +} + +void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, + EventDrivenSchedulingAgent *eventScheduler) { + std::lock_guard<std::mutex> lock(mutex_); + + try { + // Stop all the processor node, input and output ports + for (std::set<std::shared_ptr<Processor> >::iterator it = + processors_.begin(); it != processors_.end(); ++it) { + std::shared_ptr<Processor> processor(*it); + if (processor->getSchedulingStrategy() == TIMER_DRIVEN) + timeScheduler->unschedule(processor); + else if (processor->getSchedulingStrategy() == EVENT_DRIVEN) + eventScheduler->unschedule(processor); + } + + for (std::set<ProcessGroup *>::iterator it = child_process_groups_.begin(); + it != child_process_groups_.end(); ++it) { + ProcessGroup *processGroup(*it); + processGroup->stopProcessing(timeScheduler, eventScheduler); + } + } catch (std::exception &exception) { + logger_->log_debug("Caught Exception %s", exception.what()); + throw; + } catch (...) { + logger_->log_debug("Caught Exception during process group stop processing"); + throw; + } +} + +std::shared_ptr<Processor> ProcessGroup::findProcessor(uuid_t uuid) { + + std::shared_ptr<Processor> ret = NULL; + // std::lock_guard<std::mutex> lock(mutex_); + + for (auto processor : processors_) { + logger_->log_info("find processor %s", processor->getName().c_str()); + uuid_t processorUUID; + + if (processor->getUUID(processorUUID)) { + + char uuid_str[37]; // ex. "1b4e28ba-2fa1-11d2-883f-0016d3cca427" + "\0" + uuid_unparse_lower(processorUUID, uuid_str); + std::string processorUUIDstr = uuid_str; + uuid_unparse_lower(uuid, uuid_str); + std::string uuidStr = uuid_str; + if (processorUUIDstr == uuidStr) { + return processor; + } + } + + } + for (auto processGroup : child_process_groups_) { + + logger_->log_info("find processor child %s", + processGroup->getName().c_str()); + std::shared_ptr<Processor> processor = processGroup->findProcessor(uuid); + if (processor) + return processor; + } + + return ret; +} + +std::shared_ptr<Processor> ProcessGroup::findProcessor( + const std::string &processorName) { + std::shared_ptr<Processor> ret = NULL; + + for (auto processor : processors_) { + logger_->log_debug("Current processor is %s", processor->getName().c_str()); + if (processor->getName() == processorName) + return processor; + } + + for (auto processGroup : child_process_groups_) { + std::shared_ptr<Processor> processor = processGroup->findProcessor( + processorName); + if (processor) + return processor; + } + + return ret; +} + +void ProcessGroup::updatePropertyValue(std::string processorName, + std::string propertyName, + std::string propertyValue) { + std::lock_guard<std::mutex> lock(mutex_); + + for (auto processor : processors_) { + if (processor->getName() == processorName) { + processor->setProperty(propertyName, propertyValue); + } + } + + for (auto processGroup : child_process_groups_) { + processGroup->updatePropertyValue(processorName, propertyName, + propertyValue); + } + + return; +} + +void ProcessGroup::getConnections( + std::map<std::string, std::shared_ptr<Connection>> &connectionMap) { + for (auto connection : connections_) { + connectionMap[connection->getUUIDStr()] = connection; + } + + for (auto processGroup : child_process_groups_) { + processGroup->getConnections(connectionMap); + } +} + +void ProcessGroup::addConnection(std::shared_ptr<Connection> connection) { + std::lock_guard<std::mutex> lock(mutex_); + + if (connections_.find(connection) == connections_.end()) { + // We do not have the same connection in this process group yet + connections_.insert(connection); + logger_->log_info("Add connection %s into process group %s", + connection->getName().c_str(), name_.c_str()); + uuid_t sourceUUID; + std::shared_ptr<Processor> source = NULL; + connection->getSourceUUID(sourceUUID); + source = this->findProcessor(sourceUUID); + if (source) + source->addConnection(connection); + std::shared_ptr<Processor> destination = NULL; + uuid_t destinationUUID; + connection->getDestinationUUID(destinationUUID); + destination = this->findProcessor(destinationUUID); + if (destination && destination != source) + destination->addConnection(connection); + } +} + +void ProcessGroup::removeConnection(std::shared_ptr<Connection> connection) { + std::lock_guard<std::mutex> lock(mutex_); + + if (connections_.find(connection) != connections_.end()) { + // We do not have the same connection in this process group yet + connections_.erase(connection); + logger_->log_info("Remove connection %s into process group %s", + connection->getName().c_str(), name_.c_str()); + uuid_t sourceUUID; + std::shared_ptr<Processor> source = NULL; + connection->getSourceUUID(sourceUUID); + source = this->findProcessor(sourceUUID); + if (source) + source->removeConnection(connection); + std::shared_ptr<Processor> destination = NULL; + uuid_t destinationUUID; + connection->getDestinationUUID(destinationUUID); + destination = this->findProcessor(destinationUUID); + if (destination && destination != source) + destination->removeConnection(connection); + } +} + +} /* namespace processor */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/ProcessSession.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp new file mode 100644 index 0000000..e6fa7c4 --- /dev/null +++ b/libminifi/src/core/ProcessSession.cpp @@ -0,0 +1,941 @@ +/** + * @file ProcessSession.cpp + * ProcessSession class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vector> +#include <queue> +#include <map> +#include <set> +#include <sys/time.h> +#include <time.h> +#include <chrono> +#include <thread> +#include <iostream> + +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +std::shared_ptr<core::FlowFile> ProcessSession::create() { + std::map<std::string, std::string> empty; + std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(), + empty); + + _addedFlowFiles[record->getUUIDStr()] = record; + logger_->log_debug("Create FlowFile with UUID %s", + record->getUUIDStr().c_str()); + std::string details = process_context_->getProcessorNode().getName() + + " creates flow record " + record->getUUIDStr(); + provenance_report_->create(record, details); + + return record; +} + +std::shared_ptr<core::FlowFile> ProcessSession::create(std::shared_ptr<core::FlowFile> &parent) { + std::map<std::string, std::string> empty; + std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(), + empty); + + if (record) { + _addedFlowFiles[record->getUUIDStr()] = record; + logger_->log_debug("Create FlowFile with UUID %s", + record->getUUIDStr().c_str()); + } + + if (record) { + // Copy attributes + std::map<std::string, std::string> parentAttributes = + parent->getAttributes(); + std::map<std::string, std::string>::iterator it; + for (it = parentAttributes.begin(); it != parentAttributes.end(); it++) { + if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) + || it->first == FlowAttributeKey(DISCARD_REASON) + || it->first == FlowAttributeKey(UUID)) + // Do not copy special attributes from parent + continue; + record->setAttribute(it->first, it->second); + } + record->setLineageStartDate(parent->getlineageStartDate()); + record->setLineageIdentifiers(parent->getlineageIdentifiers()); + parent->getlineageIdentifiers().insert(parent->getUUIDStr()); + + } + return record; +} + +std::shared_ptr<core::FlowFile> ProcessSession::clone( + std::shared_ptr<core::FlowFile> &parent) { + std::shared_ptr<core::FlowFile> record = this->create(parent); + if (record) { + // Copy Resource Claim + std::shared_ptr<ResourceClaim> parent_claim = parent->getResourceClaim(); + record->setResourceClaim(parent_claim); + if (parent_claim != nullptr) { + record->setOffset(parent->getOffset()); + record->setSize(parent->getSize()); + record->getResourceClaim()->increaseFlowFileRecordOwnedCount(); + ; + } + provenance_report_->clone(parent, record); + } + return record; +} + +std::shared_ptr<core::FlowFile> ProcessSession::cloneDuringTransfer( + std::shared_ptr<core::FlowFile> &parent) { + std::map<std::string, std::string> empty; + std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(), + empty); + + if (record) { + this->_clonedFlowFiles[record->getUUIDStr()] = record; + logger_->log_debug("Clone FlowFile with UUID %s during transfer", + record->getUUIDStr().c_str()); + // Copy attributes + std::map<std::string, std::string> parentAttributes = + parent->getAttributes(); + std::map<std::string, std::string>::iterator it; + for (it = parentAttributes.begin(); it != parentAttributes.end(); it++) { + if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) + || it->first == FlowAttributeKey(DISCARD_REASON) + || it->first == FlowAttributeKey(UUID)) + // Do not copy special attributes from parent + continue; + record->setAttribute(it->first, it->second); + } + record->setLineageStartDate(parent->getlineageStartDate()); + + record->setLineageIdentifiers(parent->getlineageIdentifiers()); + record->getlineageIdentifiers().insert(parent->getUUIDStr()); + + // Copy Resource Claim + std::shared_ptr<ResourceClaim> parent_claim = parent->getResourceClaim(); + record->setResourceClaim(parent_claim); + if (parent_claim != nullptr) { + record->setOffset(parent->getOffset()); + record->setSize(parent->getSize()); + record->getResourceClaim()->increaseFlowFileRecordOwnedCount(); + ; + } + provenance_report_->clone(parent, record); + } + + return record; +} + +std::shared_ptr<core::FlowFile> ProcessSession::clone( + std::shared_ptr<core::FlowFile> &parent, long offset, long size) { + std::shared_ptr<core::FlowFile> record = this->create(parent); + if (record) { + + if (parent->getResourceClaim()) { + if ((offset + size) > (long) parent->getSize()) { + // Set offset and size + logger_->log_error("clone offset %d and size %d exceed parent size %d", + offset, size, parent->getSize()); + // Remove the Add FlowFile for the session + std::map<std::string, std::shared_ptr<core::FlowFile> >::iterator it = + this->_addedFlowFiles.find(record->getUUIDStr()); + if (it != this->_addedFlowFiles.end()) + this->_addedFlowFiles.erase(record->getUUIDStr()); + return nullptr; + } + record->setOffset(parent->getOffset() + parent->getOffset()); + record->setSize(size); + // Copy Resource Claim + std::shared_ptr<ResourceClaim> parent_claim = parent->getResourceClaim(); + record->setResourceClaim(parent_claim); + if (parent_claim != nullptr) { + + record->getResourceClaim()->increaseFlowFileRecordOwnedCount(); + } + } + provenance_report_->clone(parent, record); + } + return record; +} + +void ProcessSession::remove(std::shared_ptr<core::FlowFile> &flow) { + flow->setDeleted(true); + _deletedFlowFiles[flow->getUUIDStr()] = flow; + std::string reason = process_context_->getProcessorNode().getName() + + " drop flow record " + flow->getUUIDStr(); + provenance_report_->drop(flow, reason); +} + +void ProcessSession::remove(std::shared_ptr<core::FlowFile> &&flow) { + flow->setDeleted(true); + _deletedFlowFiles[flow->getUUIDStr()] = flow; + std::string reason = process_context_->getProcessorNode().getName() + + " drop flow record " + flow->getUUIDStr(); + provenance_report_->drop(flow, reason); +} + +void ProcessSession::putAttribute(std::shared_ptr<core::FlowFile> &flow, + std::string key, std::string value) { + flow->setAttribute(key, value); + std::string details = process_context_->getProcessorNode().getName() + + " modify flow record " + flow->getUUIDStr() + " attribute " + key + ":" + + value; + provenance_report_->modifyAttributes(flow, details); +} + +void ProcessSession::removeAttribute(std::shared_ptr<core::FlowFile> &flow, + std::string key) { + flow->removeAttribute(key); + std::string details = process_context_->getProcessorNode().getName() + + " remove flow record " + flow->getUUIDStr() + " attribute " + key; + provenance_report_->modifyAttributes(flow, details); +} + +void ProcessSession::putAttribute(std::shared_ptr<core::FlowFile> &&flow, + std::string key, std::string value) { + flow->setAttribute(key, value); + std::string details = process_context_->getProcessorNode().getName() + + " modify flow record " + flow->getUUIDStr() + " attribute " + key + ":" + + value; + provenance_report_->modifyAttributes(flow, details); +} + +void ProcessSession::removeAttribute(std::shared_ptr<core::FlowFile> &&flow, + std::string key) { + flow->removeAttribute(key); + std::string details = process_context_->getProcessorNode().getName() + + " remove flow record " + flow->getUUIDStr() + " attribute " + key; + provenance_report_->modifyAttributes(flow, details); +} + +void ProcessSession::penalize(std::shared_ptr<core::FlowFile> &flow) { + flow->setPenaltyExpiration( + getTimeMillis() + + process_context_->getProcessorNode().getPenalizationPeriodMsec()); +} + +void ProcessSession::penalize(std::shared_ptr<core::FlowFile> &&flow) { + flow->setPenaltyExpiration( + getTimeMillis() + + process_context_->getProcessorNode().getPenalizationPeriodMsec()); +} + +void ProcessSession::transfer(std::shared_ptr<core::FlowFile> &flow, + Relationship relationship) { + _transferRelationship[flow->getUUIDStr()] = relationship; +} + +void ProcessSession::transfer(std::shared_ptr<core::FlowFile> &&flow, + Relationship relationship) { + _transferRelationship[flow->getUUIDStr()] = relationship; +} + +void ProcessSession::write(std::shared_ptr<core::FlowFile> &flow, + OutputStreamCallback *callback) { + std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>( + DEFAULT_CONTENT_DIRECTORY); + + try { + std::ofstream fs; + uint64_t startTime = getTimeMillis(); + fs.open(claim->getContentFullPath().c_str(), + std::fstream::out | std::fstream::binary | std::fstream::trunc); + if (fs.is_open()) { + // Call the callback to write the content + callback->process(&fs); + if (fs.good() && fs.tellp() >= 0) { + flow->setSize(fs.tellp()); + flow->setOffset(0); + std::shared_ptr<ResourceClaim> flow_claim = flow->getResourceClaim(); + if (flow_claim != nullptr) { + // Remove the old claim + flow_claim->decreaseFlowFileRecordOwnedCount(); + flow->clearResourceClaim(); + } + flow->setResourceClaim(claim); + claim->increaseFlowFileRecordOwnedCount(); + /* + logger_->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s", + flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ + fs.close(); + std::string details = process_context_->getProcessorNode().getName() + + " modify flow record content " + flow->getUUIDStr(); + uint64_t endTime = getTimeMillis(); + provenance_report_->modifyContent(flow, details, endTime - startTime); + } else { + fs.close(); + throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error"); + } + } else { + throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error"); + } + } catch (std::exception &exception) { + if (flow && flow->getResourceClaim() == claim) { + flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); + flow->clearResourceClaim(); + } + logger_->log_debug("Caught Exception %s", exception.what()); + throw; + } catch (...) { + if (flow && flow->getResourceClaim() == claim) { + flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); + flow->clearResourceClaim(); + } + logger_->log_debug("Caught Exception during process session write"); + throw; + } +} + +void ProcessSession::write(std::shared_ptr<core::FlowFile> &&flow, + OutputStreamCallback *callback) { + std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(); + try { + std::ofstream fs; + uint64_t startTime = getTimeMillis(); + fs.open(claim->getContentFullPath().c_str(), + std::fstream::out | std::fstream::binary | std::fstream::trunc); + if (fs.is_open()) { + // Call the callback to write the content + callback->process(&fs); + if (fs.good() && fs.tellp() >= 0) { + flow->setSize(fs.tellp()); + flow->setOffset(0); + std::shared_ptr<ResourceClaim> flow_claim = flow->getResourceClaim(); + if (flow_claim != nullptr) { + // Remove the old claim + flow_claim->decreaseFlowFileRecordOwnedCount(); + flow->clearResourceClaim(); + } + flow->setResourceClaim(claim); + claim->increaseFlowFileRecordOwnedCount(); + /* + logger_->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s", + flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ + fs.close(); + std::string details = process_context_->getProcessorNode().getName() + + " modify flow record content " + flow->getUUIDStr(); + uint64_t endTime = getTimeMillis(); + provenance_report_->modifyContent(flow, details, endTime - startTime); + } else { + fs.close(); + throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error"); + } + } else { + throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error"); + } + } catch (std::exception &exception) { + if (flow && flow->getResourceClaim() == claim) { + flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); + flow->clearResourceClaim(); + } + logger_->log_debug("Caught Exception %s", exception.what()); + throw; + } catch (...) { + if (flow && flow->getResourceClaim() == claim) { + flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); + flow->clearResourceClaim(); + } + logger_->log_debug("Caught Exception during process session write"); + throw; + } +} + +void ProcessSession::append(std::shared_ptr<core::FlowFile> &&flow, + OutputStreamCallback *callback) { + std::shared_ptr<ResourceClaim> claim = nullptr; + + if (flow->getResourceClaim() == nullptr) { + // No existed claim for append, we need to create new claim + return write(flow, callback); + } + + claim = flow->getResourceClaim(); + + try { + std::ofstream fs; + uint64_t startTime = getTimeMillis(); + fs.open(claim->getContentFullPath().c_str(), + std::fstream::out | std::fstream::binary | std::fstream::app); + if (fs.is_open()) { + // Call the callback to write the content + std::streampos oldPos = fs.tellp(); + callback->process(&fs); + if (fs.good() && fs.tellp() >= 0) { + uint64_t appendSize = fs.tellp() - oldPos; + flow->setSize(flow->getSize() + appendSize); + /* + logger_->log_debug("Append offset %d extra length %d to new size %d into content %s for FlowFile UUID %s", + flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ + fs.close(); + std::string details = process_context_->getProcessorNode().getName() + + " modify flow record content " + flow->getUUIDStr(); + uint64_t endTime = getTimeMillis(); + provenance_report_->modifyContent(flow, details, endTime - startTime); + } else { + fs.close(); + throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error"); + } + } else { + throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error"); + } + } catch (std::exception &exception) { + logger_->log_debug("Caught Exception %s", exception.what()); + throw; + } catch (...) { + logger_->log_debug("Caught Exception during process session append"); + throw; + } +} + +void ProcessSession::append(std::shared_ptr<core::FlowFile> &flow, + OutputStreamCallback *callback) { + std::shared_ptr<ResourceClaim> claim = nullptr; + + if (flow->getResourceClaim() == nullptr) { + // No existed claim for append, we need to create new claim + return write(flow, callback); + } + + claim = flow->getResourceClaim(); + + try { + std::ofstream fs; + uint64_t startTime = getTimeMillis(); + fs.open(claim->getContentFullPath().c_str(), + std::fstream::out | std::fstream::binary | std::fstream::app); + if (fs.is_open()) { + // Call the callback to write the content + std::streampos oldPos = fs.tellp(); + callback->process(&fs); + if (fs.good() && fs.tellp() >= 0) { + uint64_t appendSize = fs.tellp() - oldPos; + flow->setSize(flow->getSize() + appendSize); + /* + logger_->log_debug("Append offset %d extra length %d to new size %d into content %s for FlowFile UUID %s", + flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ + fs.close(); + std::string details = process_context_->getProcessorNode().getName() + + " modify flow record content " + flow->getUUIDStr(); + uint64_t endTime = getTimeMillis(); + provenance_report_->modifyContent(flow, details, endTime - startTime); + } else { + fs.close(); + throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error"); + } + } else { + throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error"); + } + } catch (std::exception &exception) { + logger_->log_debug("Caught Exception %s", exception.what()); + throw; + } catch (...) { + logger_->log_debug("Caught Exception during process session append"); + throw; + } +} + +void ProcessSession::read(std::shared_ptr<core::FlowFile> &flow, + InputStreamCallback *callback) { + try { + std::shared_ptr<ResourceClaim> claim = nullptr; + + if (flow->getResourceClaim() == nullptr) { + // No existed claim for read, we throw exception + throw Exception(FILE_OPERATION_EXCEPTION, + "No Content Claim existed for read"); + } + + claim = flow->getResourceClaim(); + std::ifstream fs; + fs.open(claim->getContentFullPath().c_str(), + std::fstream::in | std::fstream::binary); + if (fs.is_open()) { + fs.seekg(flow->getOffset(), fs.beg); + + if (fs.good()) { + callback->process(&fs); + /* + logger_->log_debug("Read offset %d size %d content %s for FlowFile UUID %s", + flow->_offset, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ + fs.close(); + } else { + fs.close(); + throw Exception(FILE_OPERATION_EXCEPTION, "File Read Error"); + } + } else { + throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error"); + } + } catch (std::exception &exception) { + logger_->log_debug("Caught Exception %s", exception.what()); + throw; + } catch (...) { + logger_->log_debug("Caught Exception during process session read"); + throw; + } +} + +void ProcessSession::read(std::shared_ptr<core::FlowFile> &&flow, + InputStreamCallback *callback) { + try { + std::shared_ptr<ResourceClaim> claim = nullptr; + + if (flow->getResourceClaim() == nullptr) { + // No existed claim for read, we throw exception + throw Exception(FILE_OPERATION_EXCEPTION, + "No Content Claim existed for read"); + } + + claim = flow->getResourceClaim(); + std::ifstream fs; + fs.open(claim->getContentFullPath().c_str(), + std::fstream::in | std::fstream::binary); + if (fs.is_open()) { + fs.seekg(flow->getOffset(), fs.beg); + + if (fs.good()) { + callback->process(&fs); + /* + logger_->log_debug("Read offset %d size %d content %s for FlowFile UUID %s", + flow->_offset, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ + fs.close(); + } else { + fs.close(); + throw Exception(FILE_OPERATION_EXCEPTION, "File Read Error"); + } + } else { + throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error"); + } + } catch (std::exception &exception) { + logger_->log_debug("Caught Exception %s", exception.what()); + throw; + } catch (...) { + logger_->log_debug("Caught Exception during process session read"); + throw; + } +} + +void ProcessSession::import(std::string source, + std::shared_ptr<core::FlowFile> &flow, + bool keepSource, uint64_t offset) { + std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(); + char *buf = NULL; + int size = 4096; + buf = new char[size]; + + try { + std::ofstream fs; + uint64_t startTime = getTimeMillis(); + fs.open(claim->getContentFullPath().c_str(), + std::fstream::out | std::fstream::binary | std::fstream::trunc); + std::ifstream input; + input.open(source.c_str(), std::fstream::in | std::fstream::binary); + + if (fs.is_open() && input.is_open()) { + // Open the source file and stream to the flow file + input.seekg(offset, fs.beg); + while (input.good()) { + input.read(buf, size); + if (input) + fs.write(buf, size); + else + fs.write(buf, input.gcount()); + } + + if (fs.good() && fs.tellp() >= 0) { + flow->setSize(fs.tellp()); + flow->setOffset(0); + if (flow->getResourceClaim() != nullptr) { + // Remove the old claim + flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); + flow->clearResourceClaim(); + } + flow->setResourceClaim(claim); + claim->increaseFlowFileRecordOwnedCount(); + + logger_->log_debug( + "Import offset %d length %d into content %s for FlowFile UUID %s", + flow->getOffset(), flow->getSize(), + flow->getResourceClaim()->getContentFullPath().c_str(), + flow->getUUIDStr().c_str()); + + fs.close(); + input.close(); + if (!keepSource) + std::remove(source.c_str()); + std::string details = process_context_->getProcessorNode().getName() + + " modify flow record content " + flow->getUUIDStr(); + uint64_t endTime = getTimeMillis(); + provenance_report_->modifyContent(flow, details, endTime - startTime); + } else { + fs.close(); + input.close(); + throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error"); + } + } else { + throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error"); + } + + delete[] buf; + } catch (std::exception &exception) { + if (flow && flow->getResourceClaim() == claim) { + flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); + flow->clearResourceClaim(); + } + logger_->log_debug("Caught Exception %s", exception.what()); + delete[] buf; + throw; + } catch (...) { + if (flow && flow->getResourceClaim() == claim) { + flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); + flow->clearResourceClaim(); + } + logger_->log_debug("Caught Exception during process session write"); + delete[] buf; + throw; + } +} + +void ProcessSession::import(std::string source, + std::shared_ptr<core::FlowFile> &&flow, + bool keepSource, uint64_t offset) { + std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(); + + char *buf = NULL; + int size = 4096; + buf = new char[size]; + + try { + std::ofstream fs; + uint64_t startTime = getTimeMillis(); + fs.open(claim->getContentFullPath().c_str(), + std::fstream::out | std::fstream::binary | std::fstream::trunc); + std::ifstream input; + input.open(source.c_str(), std::fstream::in | std::fstream::binary); + + if (fs.is_open() && input.is_open()) { + // Open the source file and stream to the flow file + input.seekg(offset, fs.beg); + while (input.good()) { + input.read(buf, size); + if (input) + fs.write(buf, size); + else + fs.write(buf, input.gcount()); + } + + if (fs.good() && fs.tellp() >= 0) { + flow->setSize(fs.tellp()); + flow->setOffset(0); + if (flow->getResourceClaim() != nullptr) { + // Remove the old claim + flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); + flow->clearResourceClaim(); + } + flow->setResourceClaim(claim); + claim->increaseFlowFileRecordOwnedCount(); + + logger_->log_debug( + "Import offset %d length %d into content %s for FlowFile UUID %s", + flow->getOffset(), flow->getSize(), + flow->getResourceClaim()->getContentFullPath().c_str(), + flow->getUUIDStr().c_str()); + + fs.close(); + input.close(); + if (!keepSource) + std::remove(source.c_str()); + std::string details = process_context_->getProcessorNode().getName() + + " modify flow record content " + flow->getUUIDStr(); + uint64_t endTime = getTimeMillis(); + provenance_report_->modifyContent(flow, details, endTime - startTime); + } else { + fs.close(); + input.close(); + throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error"); + } + } else { + throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error"); + } + + delete[] buf; + } catch (std::exception &exception) { + if (flow && flow->getResourceClaim() == claim) { + flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); + flow->clearResourceClaim(); + } + logger_->log_debug("Caught Exception %s", exception.what()); + delete[] buf; + throw; + } catch (...) { + if (flow && flow->getResourceClaim() == claim) { + flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); + flow->clearResourceClaim(); + } + logger_->log_debug("Caught Exception during process session write"); + delete[] buf; + throw; + } +} + +void ProcessSession::commit() { + + try { + // First we clone the flow record based on the transfered relationship for updated flow record + for (auto && it : _updatedFlowFiles) { + std::shared_ptr<core::FlowFile> record = it.second; + if (record->isDeleted()) + continue; + std::map<std::string, Relationship>::iterator itRelationship = this + ->_transferRelationship.find(record->getUUIDStr()); + if (itRelationship != _transferRelationship.end()) { + Relationship relationship = itRelationship->second; + // Find the relationship, we need to find the connections for that relationship + std::set<std::shared_ptr<Connectable>> connections = process_context_ + ->getProcessorNode().getOutGoingConnections(relationship.getName()); + if (connections.empty()) { + // No connection + if (!process_context_->getProcessorNode().isAutoTerminated( + relationship)) { + // Not autoterminate, we should have the connect + std::string message = + "Connect empty for non auto terminated relationship" + + relationship.getName(); + throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str()); + } else { + // Autoterminated + remove(record); + } + } else { + // We connections, clone the flow and assign the connection accordingly + for (std::set<std::shared_ptr<Connectable>>::iterator itConnection = + connections.begin(); itConnection != connections.end(); + ++itConnection) { + std::shared_ptr<Connectable> connection = *itConnection; + if (itConnection == connections.begin()) { + // First connection which the flow need be routed to + record->setConnection(connection); + } else { + // Clone the flow file and route to the connection + std::shared_ptr<core::FlowFile> cloneRecord; + cloneRecord = this->cloneDuringTransfer(record); + if (cloneRecord) + cloneRecord->setConnection(connection); + else + throw Exception(PROCESS_SESSION_EXCEPTION, + "Can not clone the flow for transfer"); + } + } + } + } else { + // Can not find relationship for the flow + throw Exception(PROCESS_SESSION_EXCEPTION, + "Can not find the transfer relationship for the flow"); + } + } + + // Do the samething for added flow file + for (const auto it : _addedFlowFiles) { + std::shared_ptr<core::FlowFile> record = it.second; + if (record->isDeleted()) + continue; + std::map<std::string, Relationship>::iterator itRelationship = this + ->_transferRelationship.find(record->getUUIDStr()); + if (itRelationship != _transferRelationship.end()) { + Relationship relationship = itRelationship->second; + // Find the relationship, we need to find the connections for that relationship + std::set<std::shared_ptr<Connectable>> connections = process_context_ + ->getProcessorNode().getOutGoingConnections(relationship.getName()); + if (connections.empty()) { + // No connection + if (!process_context_->getProcessorNode().isAutoTerminated( + relationship)) { + // Not autoterminate, we should have the connect + std::string message = + "Connect empty for non auto terminated relationship " + + relationship.getName(); + throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str()); + } else { + // Autoterminated + remove(record); + } + } else { + // We connections, clone the flow and assign the connection accordingly + for (std::set<std::shared_ptr<Connectable>>::iterator itConnection = + connections.begin(); itConnection != connections.end(); + ++itConnection) { + std::shared_ptr<Connectable> connection(*itConnection); + if (itConnection == connections.begin()) { + // First connection which the flow need be routed to + record->setConnection(connection); + } else { + // Clone the flow file and route to the connection + std::shared_ptr<core::FlowFile> cloneRecord; + cloneRecord = this->cloneDuringTransfer(record); + if (cloneRecord) + cloneRecord->setConnection(connection); + else + throw Exception(PROCESS_SESSION_EXCEPTION, + "Can not clone the flow for transfer"); + } + } + } + } else { + // Can not find relationship for the flow + throw Exception(PROCESS_SESSION_EXCEPTION, + "Can not find the transfer relationship for the flow"); + } + } + + std::shared_ptr<Connection> connection = nullptr; + // Complete process the added and update flow files for the session, send the flow file to its queue + for (const auto &it : _updatedFlowFiles) { + std::shared_ptr<core::FlowFile> record = it.second; + if (record->isDeleted()) { + continue; + } + + connection = std::static_pointer_cast<Connection>( + record->getConnection()); + if ((connection) != nullptr) + connection->put(record); + } + for (const auto &it : _addedFlowFiles) { + std::shared_ptr<core::FlowFile> record = it.second; + if (record->isDeleted()) { + continue; + } + connection = std::static_pointer_cast<Connection>( + record->getConnection()); + if ((connection) != nullptr) + connection->put(record); + } + // Process the clone flow files + for (const auto &it : _clonedFlowFiles) { + std::shared_ptr<core::FlowFile> record = it.second; + if (record->isDeleted()) { + continue; + } + connection = std::static_pointer_cast<Connection>( + record->getConnection()); + if ((connection) != nullptr) + connection->put(record); + } + + // All done + _updatedFlowFiles.clear(); + _addedFlowFiles.clear(); + _clonedFlowFiles.clear(); + _deletedFlowFiles.clear(); + _originalFlowFiles.clear(); + // persistent the provenance report + this->provenance_report_->commit(); + logger_->log_trace("ProcessSession committed for %s", + process_context_->getProcessorNode().getName().c_str()); + } catch (std::exception &exception) { + logger_->log_debug("Caught Exception %s", exception.what()); + throw; + } catch (...) { + logger_->log_debug("Caught Exception during process session commit"); + throw; + } +} + +void ProcessSession::rollback() { + try { + std::shared_ptr<Connection> connection = nullptr; + // Requeue the snapshot of the flowfile back + for (const auto &it : _originalFlowFiles) { + std::shared_ptr<core::FlowFile> record = it.second; + connection = std::static_pointer_cast<Connection>( + record->getOriginalConnection()); + if ((connection) != nullptr) { + std::shared_ptr<FlowFileRecord> flowf = std::static_pointer_cast< + FlowFileRecord>(record); + flowf->setSnapShot(false); + connection->put(record); + } + + } + _originalFlowFiles.clear(); + + _clonedFlowFiles.clear(); + _addedFlowFiles.clear(); + _updatedFlowFiles.clear(); + _deletedFlowFiles.clear(); + logger_->log_trace("ProcessSession rollback for %s", + process_context_->getProcessorNode().getName().c_str()); + } catch (std::exception &exception) { + logger_->log_debug("Caught Exception %s", exception.what()); + throw; + } catch (...) { + logger_->log_debug("Caught Exception during process session roll back"); + throw; + } +} + +std::shared_ptr<core::FlowFile> ProcessSession::get() { + std::shared_ptr<Connectable> first = process_context_->getProcessorNode() + .getNextIncomingConnection(); + + if (first == NULL) + return NULL; + + std::shared_ptr<Connection> current = std::static_pointer_cast<Connection>( + first); + + do { + std::set<std::shared_ptr<core::FlowFile> > expired; + std::shared_ptr<core::FlowFile> ret = current->poll(expired); + if (expired.size() > 0) { + // Remove expired flow record + for (std::set<std::shared_ptr<core::FlowFile> >::iterator it = expired + .begin(); it != expired.end(); ++it) { + std::shared_ptr<core::FlowFile> record = *it; + std::string details = process_context_->getProcessorNode().getName() + + " expire flow record " + record->getUUIDStr(); + provenance_report_->expire(record, details); + } + } + if (ret) { + // add the flow record to the current process session update map + ret->setDeleted(false); + _updatedFlowFiles[ret->getUUIDStr()] = ret; + std::map<std::string, std::string> empty; + std::shared_ptr<core::FlowFile> snapshot = + std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(),empty); + logger_->log_debug("Create Snapshot FlowFile with UUID %s", + snapshot->getUUIDStr().c_str()); + snapshot = ret; +// snapshot->duplicate(ret); + // save a snapshot + _originalFlowFiles[snapshot->getUUIDStr()] = snapshot; + return ret; + } + current = std::static_pointer_cast<Connection>( + process_context_->getProcessorNode().getNextIncomingConnection()); + } while (current != NULL && current != first); + + return NULL; +} + +} /* namespace processor */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/ProcessSessionFactory.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ProcessSessionFactory.cpp b/libminifi/src/core/ProcessSessionFactory.cpp new file mode 100644 index 0000000..445ca58 --- /dev/null +++ b/libminifi/src/core/ProcessSessionFactory.cpp @@ -0,0 +1,42 @@ +/** + * @file ProcessSessionFactory.cpp + * ProcessSessionFactory class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "core/ProcessSessionFactory.h" + +#include <memory> + + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +std::unique_ptr<ProcessSession> ProcessSessionFactory::createSession() +{ + return std::unique_ptr<ProcessSession>(new ProcessSession(process_context_)); +} + + +} /* namespace processor */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/Processor.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp new file mode 100644 index 0000000..ba52c28 --- /dev/null +++ b/libminifi/src/core/Processor.cpp @@ -0,0 +1,272 @@ +/** + * @file Processor.cpp + * Processor class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vector> +#include <queue> +#include <map> +#include <set> +#include <sys/time.h> +#include <time.h> +#include <chrono> +#include <thread> +#include <memory> +#include <functional> + +#include "core/Processor.h" + +#include "Connection.h" +#include "core/Connectable.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/ProcessSessionFactory.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +Processor::Processor(std::string name, uuid_t uuid) + : Connectable(name, uuid), + ConfigurableComponent(logging::Logger::getLogger()) { + + has_work_.store(false); + // Setup the default values + state_ = DISABLED; + strategy_ = TIMER_DRIVEN; + loss_tolerant_ = false; + _triggerWhenEmpty = false; + scheduling_period_nano_ = MINIMUM_SCHEDULING_NANOS; + run_durantion_nano_ = 0; + yield_period_msec_ = DEFAULT_YIELD_PERIOD_SECONDS * 1000; + _penalizationPeriodMsec = DEFAULT_PENALIZATION_PERIOD_SECONDS * 1000; + max_concurrent_tasks_ = 1; + active_tasks_ = 0; + yield_expiration_ = 0; + incoming_connections_Iter = this->_incomingConnections.begin(); + logger_ = logging::Logger::getLogger(); + logger_->log_info("Processor %s created UUID %s", name_.c_str(), + uuidStr_.c_str()); +} + +bool Processor::isRunning() { + return (state_ == RUNNING && active_tasks_ > 0); +} + +void Processor::setScheduledState(ScheduledState state) { + state_ = state; +} + +bool Processor::addConnection(std::shared_ptr<Connectable> conn) { + + bool ret = false; + + if (isRunning()) { + logger_->log_info("Can not add connection while the process %s is running", + name_.c_str()); + return false; + } + std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn); + std::lock_guard<std::mutex> lock(mutex_); + + uuid_t srcUUID; + uuid_t destUUID; + + connection->getSourceUUID(srcUUID); + connection->getDestinationUUID(destUUID); + char uuid_str[37]; + + uuid_unparse_lower(uuid_, uuid_str); + std::string my_uuid = uuid_str; + uuid_unparse_lower(destUUID, uuid_str); + std::string destination_uuid = uuid_str; + if (my_uuid == destination_uuid) { + // Connection is destination to the current processor + if (_incomingConnections.find(connection) == _incomingConnections.end()) { + _incomingConnections.insert(connection); + connection->setDestination(shared_from_this()); + logger_->log_info( + "Add connection %s into Processor %s incoming connection", + connection->getName().c_str(), name_.c_str()); + incoming_connections_Iter = this->_incomingConnections.begin(); + ret = true; + } + } + uuid_unparse_lower(srcUUID, uuid_str); + std::string source_uuid = uuid_str; + if (my_uuid == source_uuid) { + std::string relationship = connection->getRelationship().getName(); + // Connection is source from the current processor + auto &&it = _outGoingConnections.find(relationship); + if (it != _outGoingConnections.end()) { + // We already has connection for this relationship + std::set<std::shared_ptr<Connectable>> existedConnection = it->second; + if (existedConnection.find(connection) == existedConnection.end()) { + // We do not have the same connection for this relationship yet + existedConnection.insert(connection); + connection->setSource(shared_from_this()); + _outGoingConnections[relationship] = existedConnection; + logger_->log_info( + "Add connection %s into Processor %s outgoing connection for relationship %s", + connection->getName().c_str(), name_.c_str(), relationship.c_str()); + ret = true; + } + } else { + + // We do not have any outgoing connection for this relationship yet + std::set<std::shared_ptr<Connectable>> newConnection; + newConnection.insert(connection); + connection->setSource(shared_from_this()); + _outGoingConnections[relationship] = newConnection; + logger_->log_info( + "Add connection %s into Processor %s outgoing connection for relationship %s", + connection->getName().c_str(), name_.c_str(), relationship.c_str()); + ret = true; + } + } + + return ret; +} + +void Processor::removeConnection(std::shared_ptr<Connectable> conn) { + if (isRunning()) { + logger_->log_info( + "Can not remove connection while the process %s is running", + name_.c_str()); + return; + } + + std::lock_guard<std::mutex> lock(mutex_); + + uuid_t srcUUID; + uuid_t destUUID; + + std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn); + + connection->getSourceUUID(srcUUID); + connection->getDestinationUUID(destUUID); + + if (uuid_compare(uuid_, destUUID) == 0) { + // Connection is destination to the current processor + if (_incomingConnections.find(connection) != _incomingConnections.end()) { + _incomingConnections.erase(connection); + connection->setDestination(NULL); + logger_->log_info( + "Remove connection %s into Processor %s incoming connection", + connection->getName().c_str(), name_.c_str()); + incoming_connections_Iter = this->_incomingConnections.begin(); + } + } + + if (uuid_compare(uuid_, srcUUID) == 0) { + std::string relationship = connection->getRelationship().getName(); + // Connection is source from the current processor + auto &&it = _outGoingConnections.find(relationship); + if (it == _outGoingConnections.end()) { + return; + } else { + if (_outGoingConnections[relationship].find(connection) + != _outGoingConnections[relationship].end()) { + _outGoingConnections[relationship].erase(connection); + connection->setSource(NULL); + logger_->log_info( + "Remove connection %s into Processor %s outgoing connection for relationship %s", + connection->getName().c_str(), name_.c_str(), relationship.c_str()); + } + } + } +} + + + +bool Processor::flowFilesQueued() { + std::lock_guard<std::mutex> lock(mutex_); + + if (_incomingConnections.size() == 0) + return false; + + for (auto &&conn : _incomingConnections) { + std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn); + if (connection->getQueueSize() > 0) + return true; + } + + return false; +} + +bool Processor::flowFilesOutGoingFull() { + std::lock_guard<std::mutex> lock(mutex_); + + for (auto &&connection : _outGoingConnections) { + // We already has connection for this relationship + std::set<std::shared_ptr<Connectable>> existedConnection = connection.second; + for (const auto conn : existedConnection) { + std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn); + if (connection->isFull()) + return true; + } + } + + return false; +} + +void Processor::onTrigger(ProcessContext *context, + ProcessSessionFactory *sessionFactory) { + auto session = sessionFactory->createSession(); + + try { + // Call the virtual trigger function + onTrigger(context, session.get()); + session->commit(); + } catch (std::exception &exception) { + logger_->log_debug("Caught Exception %s", exception.what()); + session->rollback(); + throw; + } catch (...) { + logger_->log_debug("Caught Exception Processor::onTrigger"); + session->rollback(); + throw; + } +} + +bool Processor::isWorkAvailable() { + // We have work if any incoming connection has work + bool hasWork = false; + + try { + for (const auto &conn : _incomingConnections) { + std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn); + if (connection->getQueueSize() > 0) { + hasWork = true; + break; + } + } + } catch (...) { + logger_->log_error( + "Caught an exception while checking if work is available; unless it was positively determined that work is available, assuming NO work is available!"); + } + + return hasWork; +} + +} /* namespace processor */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/ProcessorNode.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ProcessorNode.cpp b/libminifi/src/core/ProcessorNode.cpp new file mode 100644 index 0000000..44491d3 --- /dev/null +++ b/libminifi/src/core/ProcessorNode.cpp @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "core/ProcessorNode.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +ProcessorNode::ProcessorNode(const std::shared_ptr<Connectable> processor) + : processor_(processor), + Connectable(processor->getName(),0), + ConfigurableComponent(logging::Logger::getLogger()) { + + uuid_t copy; + processor->getUUID(copy); + setUUID( copy ); + + +} + +ProcessorNode::ProcessorNode(const ProcessorNode &other) + : processor_(other.processor_), + Connectable(other.getName(), 0), + ConfigurableComponent(logging::Logger::getLogger()) { + + uuid_t copy; + processor_->getUUID(copy); + setUUID( copy ); + +} + +ProcessorNode::~ProcessorNode() { + +} + +bool ProcessorNode::isWorkAvailable() { + return processor_->isWorkAvailable(); +} + +bool ProcessorNode::isRunning() { + return processor_->isRunning(); +} + +} /* namespace processor */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/Property.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Property.cpp b/libminifi/src/core/Property.cpp new file mode 100644 index 0000000..287b7ec --- /dev/null +++ b/libminifi/src/core/Property.cpp @@ -0,0 +1,57 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "core/Property.h" + +namespace org { +namespace apache { +namespace nifi {namespace minifi { +namespace core { + +// Get Name for the property +std::string Property::getName() const { + return name_; +} +// Get Description for the property +std::string Property::getDescription() { + return description_; +} +// Get value for the property +std::string Property::getValue() const { + return value_; +} +// Set value for the property +void Property::setValue(std::string value) { + value_ = value; +} +// Compare +bool Property::operator <(const Property & right) const { + return name_ < right.name_; +} + +const Property &Property::operator=(const Property &other) { + name_ = other.name_; + value_ = other.value_; + return *this; +} + +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/Record.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Record.cpp b/libminifi/src/core/Record.cpp new file mode 100644 index 0000000..dbf0102 --- /dev/null +++ b/libminifi/src/core/Record.cpp @@ -0,0 +1,223 @@ +/* + * Copyright 2017 <copyright holder> <email> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/FlowFile.h" +#include "core/logging/Logger.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +FlowFile::FlowFile() + : size_(0), + id_(0), + stored(false), + offset_(0), + last_queue_date_(0), + penaltyExpiration_ms_(0), + claim_(nullptr), + marked_delete_(false), + connection_(nullptr), + original_connection_() { + entry_date_ = getTimeMillis(); + lineage_start_date_ = entry_date_; + + char uuidStr[37]; + + // Generate the global UUID for the flow record + uuid_generate(uuid_); + + uuid_unparse_lower(uuid_, uuidStr); + uuid_str_ = uuidStr; + + logger_ = logging::Logger::getLogger(); + +} + +FlowFile::~FlowFile() { + +} + +FlowFile& FlowFile::operator=(const FlowFile& other) { + + uuid_copy(uuid_, other.uuid_); + stored = other.stored; + marked_delete_ = other.marked_delete_; + entry_date_ = other.entry_date_; + lineage_start_date_ = other.lineage_start_date_; + lineage_Identifiers_ = other.lineage_Identifiers_; + last_queue_date_ = other.last_queue_date_; + size_ = other.size_; + penaltyExpiration_ms_ = other.penaltyExpiration_ms_; + attributes_ = other.attributes_; + claim_ = other.claim_; + if (claim_ != nullptr) + this->claim_->increaseFlowFileRecordOwnedCount(); + uuid_str_ = other.uuid_str_; + connection_ = other.connection_; + original_connection_ = other.original_connection_; + + return *this; +} + +/** + * Returns whether or not this flow file record + * is marked as deleted. + * @return marked deleted + */ +bool FlowFile::isDeleted() { + return marked_delete_; +} + +/** + * Sets whether to mark this flow file record + * as deleted + * @param deleted deleted flag + */ +void FlowFile::setDeleted(const bool deleted) { + marked_delete_ = deleted; +} + +std::shared_ptr<ResourceClaim> FlowFile::getResourceClaim() { + return claim_; +} + +void FlowFile::clearResourceClaim() { + claim_ = nullptr; +} +void FlowFile::setResourceClaim(std::shared_ptr<ResourceClaim> &claim) { + claim_ = claim; +} + +// ! Get Entry Date +uint64_t FlowFile::getEntryDate() { + return entry_date_; +} +uint64_t FlowFile::getEventTime() { + return event_time_; +} +// ! Get Lineage Start Date +uint64_t FlowFile::getlineageStartDate() { + return lineage_start_date_; +} + +std::set<std::string> &FlowFile::getlineageIdentifiers() { + return lineage_Identifiers_; +} + +bool FlowFile::getAttribute(std::string key, std::string &value) { + auto it = attributes_.find(key); + if (it != attributes_.end()) { + value = it->second; + return true; + } else { + return false; + } +} + +// Get Size +uint64_t FlowFile::getSize() { + return size_; +} +// ! Get Offset +uint64_t FlowFile::getOffset() { + return offset_; +} + +bool FlowFile::removeAttribute(const std::string key) { + auto it = attributes_.find(key); + if (it != attributes_.end()) { + attributes_.erase(key); + return true; + } else { + return false; + } +} + +bool FlowFile::updateAttribute(const std::string key, const std::string value) { + auto it = attributes_.find(key); + if (it != attributes_.end()) { + attributes_[key] = value; + return true; + } else { + return false; + } +} + +bool FlowFile::addAttribute(const std::string &key, const std::string &value) { + auto it = attributes_.find(key); + if (it != attributes_.end()) { + // attribute already there in the map + return false; + } else { + attributes_[key] = value; + return true; + } +} + +void FlowFile::setLineageStartDate(const uint64_t date) { + lineage_start_date_ = date; +} + +/** + * Sets the original connection with a shared pointer. + * @param connection shared connection. + */ +void FlowFile::setOriginalConnection( + std::shared_ptr<core::Connectable> &connection) { + original_connection_ = connection; +} + +/** + * Sets the connection with a shared pointer. + * @param connection shared connection. + */ +void FlowFile::setConnection(std::shared_ptr<core::Connectable> &connection) { + connection_ = connection; +} + +/** + * Sets the connection with a shared pointer. + * @param connection shared connection. + */ +void FlowFile::setConnection(std::shared_ptr<core::Connectable> &&connection) { + connection_ = connection; +} + +/** + * Returns the connection referenced by this record. + * @return shared connection pointer. + */ +std::shared_ptr<core::Connectable> FlowFile::getConnection() { + return connection_; +} + +/** + * Returns the original connection referenced by this record. + * @return shared original connection pointer. + */ +std::shared_ptr<core::Connectable> FlowFile::getOriginalConnection() { + return original_connection_; +} + +} +} +} +} +}
