http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/SchedulingAgent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp index 24ba146..1060830 100644 --- a/libminifi/src/SchedulingAgent.cpp +++ b/libminifi/src/SchedulingAgent.cpp @@ -42,7 +42,7 @@ bool SchedulingAgent::hasWorkToDo(std::shared_ptr<core::Processor> processor) { void SchedulingAgent::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { logger_->log_trace("Enabling CSN in SchedulingAgent %s", serviceNode->getName()); // reference the enable function from serviceNode - std::function<bool()> f_ex = [serviceNode] { + std::function < bool() > f_ex = [serviceNode] { return serviceNode->enable(); }; // create a functor that will be submitted to the thread pool. @@ -55,7 +55,7 @@ void SchedulingAgent::enableControllerService(std::shared_ptr<core::controller:: void SchedulingAgent::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { // reference the disable function from serviceNode - std::function<bool()> f_ex = [serviceNode] { + std::function < bool() > f_ex = [serviceNode] { return serviceNode->disable(); }; // create a functor that will be submitted to the thread pool. @@ -77,13 +77,15 @@ bool SchedulingAgent::onTrigger(std::shared_ptr<core::Processor> processor, core // No need to yield, reset yield expiration to 0 processor->clearYield(); - if (!hasWorkToDo(processor)) + if (!hasWorkToDo(processor)) { // No work to do, yield return true; - - if (hasTooMuchOutGoing(processor)) + } + if (hasTooMuchOutGoing(processor)) { + logger_->log_debug("backpressure applied because too much outgoing"); // need to apply backpressure return true; + } processor->incrementActiveTasks(); try {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/Site2SiteClientProtocol.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Site2SiteClientProtocol.cpp b/libminifi/src/Site2SiteClientProtocol.cpp index 7d6e3f3..024bd35 100644 --- a/libminifi/src/Site2SiteClientProtocol.cpp +++ b/libminifi/src/Site2SiteClientProtocol.cpp @@ -726,13 +726,19 @@ bool Site2SiteClientProtocol::send(std::string transactionID, DataPacket *packet if (ret != 8) { return false; } - if (flowFile->getSize()) { + if (flowFile->getSize() > 0) { Site2SiteClientProtocol::ReadCallback callback(packet); session->read(flowFile, &callback); if (flowFile->getSize() != packet->_size) { return false; } } + if (packet->payload_.length() == 0 && len == 0) { + if (flowFile->getResourceClaim() == nullptr) + logger_->log_debug("no claim"); + else + logger_->log_debug("Flowfile empty %s", flowFile->getResourceClaim()->getContentFullPath()); + } } else if (packet->payload_.length() > 0) { len = packet->payload_.length(); @@ -1101,8 +1107,9 @@ void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, c Transaction *transaction = NULL; - if (!flow) + if (!flow) { return; + } if (_peerState != READY) { bootstrap(); @@ -1158,11 +1165,15 @@ void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, c } // while true if (!confirm(transactionID)) { - throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed"); + std::stringstream ss; + ss << "Confirm Failed for " << transactionID; + throw Exception(SITE2SITE_EXCEPTION, ss.str().c_str()); return; } if (!complete(transactionID)) { - throw Exception(SITE2SITE_EXCEPTION, "Complete Failed"); + std::stringstream ss; + ss << "Complete Failed for " << transactionID; + throw Exception(SITE2SITE_EXCEPTION, ss.str().c_str()); return; } logger_->log_info("Site2Site transaction %s successfully send flow record %d, content bytes %d", transactionID.c_str(), transaction->_transfers, transaction->_bytes); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/ThreadedSchedulingAgent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp b/libminifi/src/ThreadedSchedulingAgent.cpp index 46a4710..7b4ce85 100644 --- a/libminifi/src/ThreadedSchedulingAgent.cpp +++ b/libminifi/src/ThreadedSchedulingAgent.cpp @@ -36,7 +36,7 @@ namespace nifi { namespace minifi { void ThreadedSchedulingAgent::schedule(std::shared_ptr<core::Processor> processor) { - std::lock_guard<std::mutex> lock(mutex_); + std::lock_guard < std::mutex > lock(mutex_); admin_yield_duration_ = 0; std::string yieldValue; @@ -68,8 +68,8 @@ void ThreadedSchedulingAgent::schedule(std::shared_ptr<core::Processor> processo } core::ProcessorNode processor_node(processor); - auto processContext = std::make_shared<core::ProcessContext>(processor_node, controller_service_provider_, repo_); - auto sessionFactory = std::make_shared<core::ProcessSessionFactory>(processContext.get()); + auto processContext = std::make_shared < core::ProcessContext > (processor_node, controller_service_provider_, repo_, flow_repo_, content_repo_); + auto sessionFactory = std::make_shared < core::ProcessSessionFactory > (processContext.get()); processor->onSchedule(processContext.get(), sessionFactory.get()); @@ -89,7 +89,7 @@ void ThreadedSchedulingAgent::schedule(std::shared_ptr<core::Processor> processo } void ThreadedSchedulingAgent::unschedule(std::shared_ptr<core::Processor> processor) { - std::lock_guard<std::mutex> lock(mutex_); + std::lock_guard < std::mutex > lock(mutex_); logger_->log_info("Shutting down threads for processor %s/%s", processor->getName().c_str(), processor->getUUIDStr().c_str()); if (processor->getScheduledState() != core::RUNNING) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/controllers/SSLContextService.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/controllers/SSLContextService.cpp b/libminifi/src/controllers/SSLContextService.cpp index a9450f6..73c9e35 100644 --- a/libminifi/src/controllers/SSLContextService.cpp +++ b/libminifi/src/controllers/SSLContextService.cpp @@ -35,7 +35,7 @@ void SSLContextService::initialize() { if (initialized_) return; - std::lock_guard<std::mutex> lock(initialization_mutex_); + std::lock_guard < std::mutex > lock(initialization_mutex_); ControllerService::initialize(); @@ -75,31 +75,31 @@ std::unique_ptr<SSLContext> SSLContextService::createSSLContext() { if (retp == 0) { logger_->log_error("Can not load CA certificate, Exiting, error : %s", std::strerror(errno)); } - return std::unique_ptr<SSLContext>(new SSLContext(ctx)); + return std::unique_ptr < SSLContext > (new SSLContext(ctx)); } const std::string &SSLContextService::getCertificateFile() { - std::lock_guard<std::mutex> lock(initialization_mutex_); + std::lock_guard < std::mutex > lock(initialization_mutex_); return certificate; } const std::string &SSLContextService::getPassphrase() { - std::lock_guard<std::mutex> lock(initialization_mutex_); + std::lock_guard < std::mutex > lock(initialization_mutex_); return passphrase_; } const std::string &SSLContextService::getPassphraseFile() { - std::lock_guard<std::mutex> lock(initialization_mutex_); + std::lock_guard < std::mutex > lock(initialization_mutex_); return passphrase_file_; } const std::string &SSLContextService::getPrivateKeyFile() { - std::lock_guard<std::mutex> lock(initialization_mutex_); + std::lock_guard < std::mutex > lock(initialization_mutex_); return private_key_; } const std::string &SSLContextService::getCACertificate() { - std::lock_guard<std::mutex> lock(initialization_mutex_); + std::lock_guard < std::mutex > lock(initialization_mutex_); return ca_certificate_; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/core/ClassLoader.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ClassLoader.cpp b/libminifi/src/core/ClassLoader.cpp index 9bead0e..fbd46f6 100644 --- a/libminifi/src/core/ClassLoader.cpp +++ b/libminifi/src/core/ClassLoader.cpp @@ -43,7 +43,7 @@ uint16_t ClassLoader::registerResource(const std::string &resource) { logger_->log_error("Cannot load library: %s", dlerror()); return RESOURCE_FAILURE; } else { - std::lock_guard<std::mutex> lock(internal_mutex_); + std::lock_guard < std::mutex > lock(internal_mutex_); dl_handles_.push_back(resource_ptr); } @@ -60,9 +60,9 @@ uint16_t ClassLoader::registerResource(const std::string &resource) { ObjectFactory *factory = create_factory_func(); - std::lock_guard<std::mutex> lock(internal_mutex_); + std::lock_guard < std::mutex > lock(internal_mutex_); - loaded_factories_[factory->getClassName()] = std::unique_ptr<ObjectFactory>(factory); + loaded_factories_[factory->getClassName()] = std::unique_ptr < ObjectFactory > (factory); return RESOURCE_SUCCESS; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/core/ConfigurableComponent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ConfigurableComponent.cpp b/libminifi/src/core/ConfigurableComponent.cpp index f5247ac..62a08db 100644 --- a/libminifi/src/core/ConfigurableComponent.cpp +++ b/libminifi/src/core/ConfigurableComponent.cpp @@ -29,6 +29,7 @@ namespace apache { namespace nifi { namespace minifi { namespace core { + ConfigurableComponent::ConfigurableComponent() : logger_(logging::LoggerFactory<ConfigurableComponent>::getLogger()) { } @@ -42,7 +43,7 @@ ConfigurableComponent::~ConfigurableComponent() { } bool ConfigurableComponent::getProperty(const std::string &name, Property &prop) { - std::lock_guard<std::mutex> lock(configuration_mutex_); + std::lock_guard < std::mutex > lock(configuration_mutex_); auto &&it = properties_.find(name); @@ -61,7 +62,7 @@ bool ConfigurableComponent::getProperty(const std::string &name, Property &prop) * @return result of getting property. */ bool ConfigurableComponent::getProperty(const std::string name, std::string &value) { - std::lock_guard<std::mutex> lock(configuration_mutex_); + std::lock_guard < std::mutex > lock(configuration_mutex_); auto &&it = properties_.find(name); if (it != properties_.end()) { @@ -80,7 +81,7 @@ bool ConfigurableComponent::getProperty(const std::string name, std::string &val * @return result of setting property. */ bool ConfigurableComponent::setProperty(const std::string name, std::string value) { - std::lock_guard<std::mutex> lock(configuration_mutex_); + std::lock_guard < std::mutex > lock(configuration_mutex_); auto &&it = properties_.find(name); if (it != properties_.end()) { @@ -101,7 +102,7 @@ bool ConfigurableComponent::setProperty(const std::string name, std::string valu * @return result of setting property. */ bool ConfigurableComponent::updateProperty(const std::string &name, const std::string &value) { - std::lock_guard<std::mutex> lock(configuration_mutex_); + std::lock_guard < std::mutex > lock(configuration_mutex_); auto &&it = properties_.find(name); if (it != properties_.end()) { @@ -122,7 +123,7 @@ bool ConfigurableComponent::updateProperty(const std::string &name, const std::s * @return whether property was set or not */ bool ConfigurableComponent::setProperty(Property &prop, std::string value) { - std::lock_guard<std::mutex> lock(configuration_mutex_); + std::lock_guard < std::mutex > lock(configuration_mutex_); auto it = properties_.find(prop.getName()); if (it != properties_.end()) { @@ -150,7 +151,7 @@ bool ConfigurableComponent::setSupportedProperties(std::set<Property> properties return false; } - std::lock_guard<std::mutex> lock(configuration_mutex_); + std::lock_guard < std::mutex > lock(configuration_mutex_); properties_.clear(); for (auto item : properties) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/core/ConfigurationFactory.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ConfigurationFactory.cpp b/libminifi/src/core/ConfigurationFactory.cpp index ea2ed5c..0a0e911 100644 --- a/libminifi/src/core/ConfigurationFactory.cpp +++ b/libminifi/src/core/ConfigurationFactory.cpp @@ -39,7 +39,8 @@ namespace core { class YamlConfiguration; #endif -std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure, +std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo, + std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configure, std::shared_ptr<io::StreamFactory> stream_factory, const std::string configuration_class_name, const std::string path, bool fail_safe) { std::string class_name_lc = configuration_class_name; @@ -47,22 +48,23 @@ std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(std::shared_ptr try { if (class_name_lc == "flowconfiguration") { // load the base configuration. - return std::unique_ptr<core::FlowConfiguration>(new core::FlowConfiguration(repo, flow_file_repo, stream_factory, configure, path)); + + return std::unique_ptr < core::FlowConfiguration > (new core::FlowConfiguration(repo, flow_file_repo, content_repo, stream_factory, configure, path)); } else if (class_name_lc == "yamlconfiguration") { // only load if the class is defined. - return std::unique_ptr<core::FlowConfiguration>(instantiate<core::YamlConfiguration>(repo, flow_file_repo, stream_factory, configure, path)); + return std::unique_ptr < core::FlowConfiguration > (instantiate<core::YamlConfiguration>(repo, flow_file_repo, content_repo, stream_factory, configure, path)); } else { if (fail_safe) { - return std::unique_ptr<core::FlowConfiguration>(new core::FlowConfiguration(repo, flow_file_repo, stream_factory, configure, path)); + return std::unique_ptr < core::FlowConfiguration > (new core::FlowConfiguration(repo, flow_file_repo, content_repo, stream_factory, configure, path)); } else { throw std::runtime_error("Support for the provided configuration class could not be found"); } } } catch (const std::runtime_error &r) { if (fail_safe) { - return std::unique_ptr<core::FlowConfiguration>(new core::FlowConfiguration(repo, flow_file_repo, stream_factory, configure, path)); + return std::unique_ptr < core::FlowConfiguration > (new core::FlowConfiguration(repo, flow_file_repo, content_repo, stream_factory, configure, path)); } } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/core/Connectable.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Connectable.cpp b/libminifi/src/core/Connectable.cpp index cf01f0c..9c3b26a 100644 --- a/libminifi/src/core/Connectable.cpp +++ b/libminifi/src/core/Connectable.cpp @@ -53,7 +53,7 @@ bool Connectable::setSupportedRelationships(std::set<core::Relationship> relatio return false; } - std::lock_guard<std::mutex> lock(relationship_mutex_); + std::lock_guard < std::mutex > lock(relationship_mutex_); relationships_.clear(); for (auto item : relationships) { @@ -67,7 +67,7 @@ bool Connectable::setSupportedRelationships(std::set<core::Relationship> relatio bool Connectable::isSupportedRelationship(core::Relationship relationship) { const bool requiresLock = isRunning(); - const auto conditionalLock = !requiresLock ? std::unique_lock<std::mutex>() : std::unique_lock<std::mutex>(relationship_mutex_); + const auto conditionalLock = !requiresLock ? std::unique_lock<std::mutex>() : std::unique_lock < std::mutex > (relationship_mutex_); const auto &it = relationships_.find(relationship.getName()); if (it != relationships_.end()) { @@ -83,7 +83,7 @@ bool Connectable::setAutoTerminatedRelationships(std::set<Relationship> relation return false; } - std::lock_guard<std::mutex> lock(relationship_mutex_); + std::lock_guard < std::mutex > lock(relationship_mutex_); auto_terminated_relationships_.clear(); for (auto item : relationships) { @@ -97,7 +97,7 @@ bool Connectable::setAutoTerminatedRelationships(std::set<Relationship> relation bool Connectable::isAutoTerminated(core::Relationship relationship) { const bool requiresLock = isRunning(); - const auto conditionalLock = !requiresLock ? std::unique_lock<std::mutex>() : std::unique_lock<std::mutex>(relationship_mutex_); + const auto conditionalLock = !requiresLock ? std::unique_lock<std::mutex>() : std::unique_lock < std::mutex > (relationship_mutex_); const auto &it = auto_terminated_relationships_.find(relationship.getName()); if (it != auto_terminated_relationships_.end()) { @@ -111,7 +111,7 @@ void Connectable::waitForWork(uint64_t timeoutMs) { has_work_.store(isWorkAvailable()); if (!has_work_.load()) { - std::unique_lock<std::mutex> lock(work_available_mutex_); + std::unique_lock < std::mutex > lock(work_available_mutex_); work_condition_.wait_for(lock, std::chrono::milliseconds(timeoutMs), [&] {return has_work_.load();}); } } @@ -143,7 +143,7 @@ std::set<std::shared_ptr<Connectable>> Connectable::getOutGoingConnections(std:: } std::shared_ptr<Connectable> Connectable::getNextIncomingConnection() { - std::lock_guard<std::mutex> lock(relationship_mutex_); + std::lock_guard < std::mutex > lock(relationship_mutex_); if (_incomingConnections.size() == 0) return NULL; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/core/Core.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Core.cpp b/libminifi/src/core/Core.cpp index 304d4ce..995c001 100644 --- a/libminifi/src/core/Core.cpp +++ b/libminifi/src/core/Core.cpp @@ -35,6 +35,11 @@ void CoreComponent::setUUID(uuid_t uuid) { uuid_unparse_lower(uuid_, uuidStr); uuidStr_ = uuidStr; } + +void CoreComponent::setUUIDStr(const std::string uuidStr) { + uuid_parse(uuidStr.c_str(), uuid_); + uuidStr_ = uuidStr; +} // Get UUID bool CoreComponent::getUUID(uuid_t uuid) { if (uuid) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/core/FlowConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp index c32add6..e8e7462 100644 --- a/libminifi/src/core/FlowConfiguration.cpp +++ b/libminifi/src/core/FlowConfiguration.cpp @@ -35,7 +35,7 @@ std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(std::string if (nullptr == ptr) { logger_->log_error("No Processor defined for %s", name.c_str()); } - std::shared_ptr<core::Processor> processor = std::static_pointer_cast<core::Processor>(ptr); + std::shared_ptr<core::Processor> processor = std::static_pointer_cast < core::Processor > (ptr); // initialize the processor processor->initialize(); @@ -53,18 +53,16 @@ std::shared_ptr<core::Processor> FlowConfiguration::createProvenanceReportTask() return processor; } -std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRootProcessGroup( - std::string name, uuid_t uuid, int version) { - return std::unique_ptr<core::ProcessGroup>( - new core::ProcessGroup(core::ROOT_PROCESS_GROUP, name, uuid, version)); +std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRootProcessGroup(std::string name, uuid_t uuid, int version) { + return std::unique_ptr < core::ProcessGroup > (new core::ProcessGroup(core::ROOT_PROCESS_GROUP, name, uuid, version)); } std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRemoteProcessGroup(std::string name, uuid_t uuid) { - return std::unique_ptr<core::ProcessGroup>(new core::ProcessGroup(core::REMOTE_PROCESS_GROUP, name, uuid)); + return std::unique_ptr < core::ProcessGroup > (new core::ProcessGroup(core::REMOTE_PROCESS_GROUP, name, uuid)); } std::shared_ptr<minifi::Connection> FlowConfiguration::createConnection(std::string name, uuid_t uuid) { - return std::make_shared<minifi::Connection>(flow_file_repo_, name, uuid); + return std::make_shared < minifi::Connection > (flow_file_repo_, content_repo_, name, uuid); } std::shared_ptr<core::controller::ControllerServiceNode> FlowConfiguration::createControllerService(const std::string &class_name, const std::string &name, uuid_t uuid) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/core/FlowFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/FlowFile.cpp b/libminifi/src/core/FlowFile.cpp index d9057c5..6afd0fe 100644 --- a/libminifi/src/core/FlowFile.cpp +++ b/libminifi/src/core/FlowFile.cpp @@ -47,7 +47,7 @@ FlowFile::FlowFile() entry_date_ = getTimeMillis(); lineage_start_date_ = entry_date_; - char uuidStr[37]; + char uuidStr[37] = { 0 }; // Generate the global UUID for the flow record id_generator_->generate(uuid_); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/core/ProcessGroup.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp index 2cf3db0..db0fe08 100644 --- a/libminifi/src/core/ProcessGroup.cpp +++ b/libminifi/src/core/ProcessGroup.cpp @@ -39,8 +39,7 @@ namespace core { std::shared_ptr<utils::IdGenerator> ProcessGroup::id_generator_ = utils::IdGenerator::getIdGenerator(); -ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid, int version, - ProcessGroup *parent) +ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid, int version, ProcessGroup *parent) : logger_(logging::LoggerFactory<ProcessGroup>::getLogger()), name_(name), type_(type), @@ -55,7 +54,7 @@ ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid, yield_period_msec_ = 0; transmitting_ = false; - logger_->log_info("ProcessGroup %s created", name_.c_str()); + logger_->log_info("ProcessGroup %s created", name_); } ProcessGroup::~ProcessGroup() { @@ -70,12 +69,12 @@ ProcessGroup::~ProcessGroup() { } bool ProcessGroup::isRootProcessGroup() { - std::lock_guard<std::recursive_mutex> lock(mutex_); + std::lock_guard < std::recursive_mutex > lock(mutex_); return (type_ == ROOT_PROCESS_GROUP); } void ProcessGroup::addProcessor(std::shared_ptr<Processor> processor) { - std::lock_guard<std::recursive_mutex> lock(mutex_); + std::lock_guard < std::recursive_mutex > lock(mutex_); if (processors_.find(processor) == processors_.end()) { // We do not have the same processor in this process group yet @@ -85,7 +84,7 @@ void ProcessGroup::addProcessor(std::shared_ptr<Processor> processor) { } void ProcessGroup::removeProcessor(std::shared_ptr<Processor> processor) { - std::lock_guard<std::recursive_mutex> lock(mutex_); + std::lock_guard < std::recursive_mutex > lock(mutex_); if (processors_.find(processor) != processors_.end()) { // We do have the same processor in this process group yet @@ -95,7 +94,7 @@ void ProcessGroup::removeProcessor(std::shared_ptr<Processor> processor) { } void ProcessGroup::addProcessGroup(ProcessGroup *child) { - std::lock_guard<std::recursive_mutex> lock(mutex_); + std::lock_guard < std::recursive_mutex > lock(mutex_); if (child_process_groups_.find(child) == child_process_groups_.end()) { // We do not have the same child process group in this process group yet @@ -105,7 +104,7 @@ void ProcessGroup::addProcessGroup(ProcessGroup *child) { } void ProcessGroup::removeProcessGroup(ProcessGroup *child) { - std::lock_guard<std::recursive_mutex> lock(mutex_); + std::lock_guard < std::recursive_mutex > lock(mutex_); if (child_process_groups_.find(child) != child_process_groups_.end()) { // We do have the same child process group in this process group yet @@ -115,7 +114,7 @@ void ProcessGroup::removeProcessGroup(ProcessGroup *child) { } void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler) { - std::lock_guard<std::recursive_mutex> lock(mutex_); + std::lock_guard < std::recursive_mutex > lock(mutex_); try { // Start all the processor node, input and output ports @@ -143,7 +142,7 @@ void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler, Ev } void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler) { - std::lock_guard<std::recursive_mutex> lock(mutex_); + std::lock_guard < std::recursive_mutex > lock(mutex_); try { // Stop all the processor node, input and output ports @@ -169,7 +168,7 @@ void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, Eve } std::shared_ptr<Processor> ProcessGroup::findProcessor(uuid_t uuid) { - std::lock_guard<std::recursive_mutex> lock(mutex_); + std::lock_guard < std::recursive_mutex > lock(mutex_); std::shared_ptr<Processor> ret = NULL; for (auto processor : processors_) { logger_->log_info("find processor %s", processor->getName().c_str()); @@ -209,7 +208,7 @@ std::shared_ptr<core::controller::ControllerServiceNode> ProcessGroup::findContr } std::shared_ptr<Processor> ProcessGroup::findProcessor(const std::string &processorName) { - std::lock_guard<std::recursive_mutex> lock(mutex_); + std::lock_guard < std::recursive_mutex > lock(mutex_); std::shared_ptr<Processor> ret = NULL; for (auto processor : processors_) { logger_->log_debug("Current processor is %s", processor->getName().c_str()); @@ -225,7 +224,7 @@ std::shared_ptr<Processor> ProcessGroup::findProcessor(const std::string &proces } void ProcessGroup::updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) { - std::lock_guard<std::recursive_mutex> lock(mutex_); + std::lock_guard < std::recursive_mutex > lock(mutex_); for (auto processor : processors_) { if (processor->getName() == processorName) { processor->setProperty(propertyName, propertyValue); @@ -247,7 +246,7 @@ void ProcessGroup::getConnections(std::map<std::string, std::shared_ptr<Connecti } void ProcessGroup::addConnection(std::shared_ptr<Connection> connection) { - std::lock_guard<std::recursive_mutex> lock(mutex_); + std::lock_guard < std::recursive_mutex > lock(mutex_); if (connections_.find(connection) == connections_.end()) { // We do not have the same connection in this process group yet @@ -269,7 +268,7 @@ void ProcessGroup::addConnection(std::shared_ptr<Connection> connection) { } void ProcessGroup::removeConnection(std::shared_ptr<Connection> connection) { - std::lock_guard<std::recursive_mutex> lock(mutex_); + std::lock_guard < std::recursive_mutex > lock(mutex_); if (connections_.find(connection) != connections_.end()) { // We do not have the same connection in this process group yet http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/core/ProcessSession.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index df21a34..c69b361 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -38,19 +38,21 @@ namespace core { std::shared_ptr<core::FlowFile> ProcessSession::create() { std::map<std::string, std::string> empty; - std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(), empty); + + std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getFlowFileRepository(), process_context_->getContentRepository(), empty); _addedFlowFiles[record->getUUIDStr()] = record; logger_->log_debug("Create FlowFile with UUID %s", record->getUUIDStr().c_str()); - std::string details = process_context_->getProcessorNode().getName() + " creates flow record " + record->getUUIDStr(); - provenance_report_->create(record, details); + std::stringstream details; + details << process_context_->getProcessorNode().getName() << " creates flow record " << record->getUUIDStr(); + provenance_report_->create(record, details.str()); return record; } std::shared_ptr<core::FlowFile> ProcessSession::create(std::shared_ptr<core::FlowFile> &&parent) { std::map<std::string, std::string> empty; - std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(), empty); + std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getFlowFileRepository(), process_context_->getContentRepository(), empty); if (record) { _addedFlowFiles[record->getUUIDStr()] = record; @@ -92,7 +94,7 @@ std::shared_ptr<core::FlowFile> ProcessSession::clone(std::shared_ptr<core::Flow std::shared_ptr<core::FlowFile> ProcessSession::cloneDuringTransfer(std::shared_ptr<core::FlowFile> &parent) { std::map<std::string, std::string> empty; - std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(), empty); + std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getFlowFileRepository(), process_context_->getContentRepository(), empty); if (record) { this->_clonedFlowFiles[record->getUUIDStr()] = record; @@ -168,26 +170,30 @@ void ProcessSession::remove(std::shared_ptr<core::FlowFile> &&flow) { void ProcessSession::putAttribute(std::shared_ptr<core::FlowFile> &flow, std::string key, std::string value) { flow->setAttribute(key, value); - std::string details = process_context_->getProcessorNode().getName() + " modify flow record " + flow->getUUIDStr() + " attribute " + key + ":" + value; - provenance_report_->modifyAttributes(flow, details); + std::stringstream details; + details << process_context_->getProcessorNode().getName() << " modify flow record " << flow->getUUIDStr() << " attribute " << key << ":" << value; + provenance_report_->modifyAttributes(flow, details.str()); } void ProcessSession::removeAttribute(std::shared_ptr<core::FlowFile> &flow, std::string key) { flow->removeAttribute(key); - std::string details = process_context_->getProcessorNode().getName() + " remove flow record " + flow->getUUIDStr() + " attribute " + key; - provenance_report_->modifyAttributes(flow, details); + std::stringstream details; + details << process_context_->getProcessorNode().getName() << " remove flow record " << flow->getUUIDStr() << " attribute " + key; + provenance_report_->modifyAttributes(flow, details.str()); } void ProcessSession::putAttribute(std::shared_ptr<core::FlowFile> &&flow, std::string key, std::string value) { flow->setAttribute(key, value); - std::string details = process_context_->getProcessorNode().getName() + " modify flow record " + flow->getUUIDStr() + " attribute " + key + ":" + value; - provenance_report_->modifyAttributes(flow, details); + std::stringstream details; + details << process_context_->getProcessorNode().getName() << " modify flow record " << flow->getUUIDStr() << " attribute " << key << ":" << value; + provenance_report_->modifyAttributes(flow, details.str()); } void ProcessSession::removeAttribute(std::shared_ptr<core::FlowFile> &&flow, std::string key) { flow->removeAttribute(key); - std::string details = process_context_->getProcessorNode().getName() + " remove flow record " + flow->getUUIDStr() + " attribute " + key; - provenance_report_->modifyAttributes(flow, details); + std::stringstream details; + details << process_context_->getProcessorNode().getName() << " remove flow record " << flow->getUUIDStr() << " attribute " << key; + provenance_report_->modifyAttributes(flow, details.str()); } void ProcessSession::penalize(std::shared_ptr<core::FlowFile> &flow) { @@ -207,41 +213,41 @@ void ProcessSession::transfer(std::shared_ptr<core::FlowFile> &&flow, Relationsh } void ProcessSession::write(std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback) { - std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>( - DEFAULT_CONTENT_DIRECTORY); + std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository()); try { - std::ofstream fs; uint64_t startTime = getTimeMillis(); - fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc); - if (fs.is_open()) { - // Call the callback to write the content - callback->process(&fs); - if (fs.good() && fs.tellp() >= 0) { - flow->setSize(fs.tellp()); - flow->setOffset(0); - std::shared_ptr<ResourceClaim> flow_claim = flow->getResourceClaim(); - if (flow_claim != nullptr) { - // Remove the old claim - flow_claim->decreaseFlowFileRecordOwnedCount(); - flow->clearResourceClaim(); - } - flow->setResourceClaim(claim); - claim->increaseFlowFileRecordOwnedCount(); - /* - logger_->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s", - flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ - fs.close(); - std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr(); - uint64_t endTime = getTimeMillis(); - provenance_report_->modifyContent(flow, details, endTime - startTime); - } else { - fs.close(); - throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error"); - } - } else { - throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error"); + claim->increaseFlowFileRecordOwnedCount(); +// fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc); + std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim); + // Call the callback to write the content + if (nullptr == stream) { + rollback(); + return; + } + if (callback->process(stream) < 0) { + rollback(); + return; + } + + flow->setSize(stream->getSize()); + flow->setOffset(0); + std::shared_ptr<ResourceClaim> flow_claim = flow->getResourceClaim(); + if (flow_claim != nullptr) { + // Remove the old claim + flow_claim->decreaseFlowFileRecordOwnedCount(); + flow->clearResourceClaim(); } + flow->setResourceClaim(claim); + + /* + logger_->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s", + flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ + stream->closeStream(); + std::stringstream details; + details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr(); + uint64_t endTime = getTimeMillis(); + provenance_report_->modifyContent(flow, details.str(), endTime - startTime); } catch (std::exception &exception) { if (flow && flow->getResourceClaim() == claim) { flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); @@ -260,39 +266,34 @@ void ProcessSession::write(std::shared_ptr<core::FlowFile> &flow, OutputStreamCa } void ProcessSession::write(std::shared_ptr<core::FlowFile> &&flow, OutputStreamCallback *callback) { - std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(); + std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository()); try { - std::ofstream fs; uint64_t startTime = getTimeMillis(); - fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc); - if (fs.is_open()) { - // Call the callback to write the content - callback->process(&fs); - if (fs.good() && fs.tellp() >= 0) { - flow->setSize(fs.tellp()); - flow->setOffset(0); - std::shared_ptr<ResourceClaim> flow_claim = flow->getResourceClaim(); - if (flow_claim != nullptr) { - // Remove the old claim - flow_claim->decreaseFlowFileRecordOwnedCount(); - flow->clearResourceClaim(); - } - flow->setResourceClaim(claim); - claim->increaseFlowFileRecordOwnedCount(); - /* - logger_->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s", - flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ - fs.close(); - std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr(); - uint64_t endTime = getTimeMillis(); - provenance_report_->modifyContent(flow, details, endTime - startTime); - } else { - fs.close(); - throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error"); - } - } else { - throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error"); + claim->increaseFlowFileRecordOwnedCount(); + std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim); + if (nullptr == stream) { + rollback(); + return; + } + // Call the callback to write the content + if (callback->process(stream) < 0) { + rollback(); + return; + } + flow->setSize(stream->getSize()); + flow->setOffset(0); + std::shared_ptr<ResourceClaim> flow_claim = flow->getResourceClaim(); + if (flow_claim != nullptr) { + // Remove the old claim + flow_claim->decreaseFlowFileRecordOwnedCount(); + flow->clearResourceClaim(); } + flow->setResourceClaim(claim); + + std::stringstream details; + details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr(); + uint64_t endTime = getTimeMillis(); + provenance_report_->modifyContent(flow, details.str(), endTime - startTime); } catch (std::exception &exception) { if (flow && flow->getResourceClaim() == claim) { flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); @@ -321,30 +322,25 @@ void ProcessSession::append(std::shared_ptr<core::FlowFile> &&flow, OutputStream claim = flow->getResourceClaim(); try { - std::ofstream fs; uint64_t startTime = getTimeMillis(); - fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::app); - if (fs.is_open()) { - // Call the callback to write the content - std::streampos oldPos = fs.tellp(); - callback->process(&fs); - if (fs.good() && fs.tellp() >= 0) { - uint64_t appendSize = fs.tellp() - oldPos; - flow->setSize(flow->getSize() + appendSize); - /* - logger_->log_debug("Append offset %d extra length %d to new size %d into content %s for FlowFile UUID %s", - flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ - fs.close(); - std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr(); - uint64_t endTime = getTimeMillis(); - provenance_report_->modifyContent(flow, details, endTime - startTime); - } else { - fs.close(); - throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error"); - } - } else { - throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error"); - } + std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim); + if (nullptr == stream) { + rollback(); + return; + } + // Call the callback to write the content + size_t oldPos = stream->getSize(); + stream->seek(oldPos + 1); + if (callback->process(stream) < 0) { + rollback(); + return; + } + uint64_t appendSize = stream->getSize() - oldPos; + flow->setSize(flow->getSize() + appendSize); + std::stringstream details; + details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr(); + uint64_t endTime = getTimeMillis(); + provenance_report_->modifyContent(flow, details.str(), endTime - startTime); } catch (std::exception &exception) { logger_->log_debug("Caught Exception %s", exception.what()); throw; @@ -365,30 +361,26 @@ void ProcessSession::append(std::shared_ptr<core::FlowFile> &flow, OutputStreamC claim = flow->getResourceClaim(); try { - std::ofstream fs; uint64_t startTime = getTimeMillis(); - fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::app); - if (fs.is_open()) { - // Call the callback to write the content - std::streampos oldPos = fs.tellp(); - callback->process(&fs); - if (fs.good() && fs.tellp() >= 0) { - uint64_t appendSize = fs.tellp() - oldPos; - flow->setSize(flow->getSize() + appendSize); - /* - logger_->log_debug("Append offset %d extra length %d to new size %d into content %s for FlowFile UUID %s", - flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ - fs.close(); - std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr(); - uint64_t endTime = getTimeMillis(); - provenance_report_->modifyContent(flow, details, endTime - startTime); - } else { - fs.close(); - throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error"); - } - } else { - throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error"); - } + std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim); + if (nullptr == stream) { + rollback(); + return; + } + // Call the callback to write the content + size_t oldPos = stream->getSize(); + stream->seek(oldPos + 1); + if (callback->process(stream) < 0) { + rollback(); + return; + } + uint64_t appendSize = stream->getSize() - oldPos; + flow->setSize(flow->getSize() + appendSize); + + std::stringstream details; + details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr(); + uint64_t endTime = getTimeMillis(); + provenance_report_->modifyContent(flow, details.str(), endTime - startTime); } catch (std::exception &exception) { logger_->log_debug("Caught Exception %s", exception.what()); throw; @@ -408,23 +400,19 @@ void ProcessSession::read(std::shared_ptr<core::FlowFile> &flow, InputStreamCall } claim = flow->getResourceClaim(); - std::ifstream fs; - fs.open(claim->getContentFullPath().c_str(), std::fstream::in | std::fstream::binary); - if (fs.is_open()) { - fs.seekg(flow->getOffset(), fs.beg); - - if (fs.good()) { - callback->process(&fs); - /* - logger_->log_debug("Read offset %d size %d content %s for FlowFile UUID %s", - flow->_offset, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ - fs.close(); - } else { - fs.close(); - throw Exception(FILE_OPERATION_EXCEPTION, "File Read Error"); - } - } else { - throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error"); + + std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->read(claim); + + if (nullptr == stream) { + rollback(); + return; + } + + stream->seek(flow->getOffset()); + + if (callback->process(stream) < 0) { + rollback(); + return; } } catch (std::exception &exception) { logger_->log_debug("Caught Exception %s", exception.what()); @@ -445,23 +433,17 @@ void ProcessSession::read(std::shared_ptr<core::FlowFile> &&flow, InputStreamCal } claim = flow->getResourceClaim(); - std::ifstream fs; - fs.open(claim->getContentFullPath().c_str(), std::fstream::in | std::fstream::binary); - if (fs.is_open()) { - fs.seekg(flow->getOffset(), fs.beg); - - if (fs.good()) { - callback->process(&fs); - /* - logger_->log_debug("Read offset %d size %d content %s for FlowFile UUID %s", - flow->_offset, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ - fs.close(); - } else { - fs.close(); - throw Exception(FILE_OPERATION_EXCEPTION, "File Read Error"); - } - } else { - throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error"); + std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->read(claim); + + if (nullptr == stream) { + rollback(); + return; + } + stream->seek(flow->getOffset()); + + if (callback->process(stream) < 0) { + rollback(); + return; } } catch (std::exception &exception) { logger_->log_debug("Caught Exception %s", exception.what()); @@ -479,60 +461,55 @@ void ProcessSession::read(std::shared_ptr<core::FlowFile> &&flow, InputStreamCal * */ void ProcessSession::importFrom(io::DataStream &stream, std::shared_ptr<core::FlowFile> &&flow) { - std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(); - + std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository()); int max_read = getpagesize(); std::vector<uint8_t> charBuffer; charBuffer.resize(max_read); try { - std::ofstream fs; uint64_t startTime = getTimeMillis(); - fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc); - - if (fs.is_open()) { - size_t position = 0; - const size_t max_size = stream.getSize(); - size_t read_size = max_read; - while (position < max_size) { - if ((max_size - position) > max_read) { - read_size = max_read; - } else { - read_size = max_size - position; - } - charBuffer.clear(); - stream.readData(charBuffer, read_size); - - fs.write((const char*) charBuffer.data(), read_size); - position += read_size; + claim->increaseFlowFileRecordOwnedCount(); + std::shared_ptr<io::BaseStream> content_stream = process_context_->getContentRepository()->write(claim); + + if (nullptr == content_stream) { + logger_->log_debug("Could not obtain claim for %s", claim->getContentFullPath()); + rollback(); + return; + } + size_t position = 0; + const size_t max_size = stream.getSize(); + size_t read_size = max_read; + while (position < max_size) { + if ((max_size - position) > max_read) { + read_size = max_read; + } else { + read_size = max_size - position; } - // Open the source file and stream to the flow file + charBuffer.clear(); + stream.readData(charBuffer, read_size); - if (fs.good() && fs.tellp() >= 0) { - flow->setSize(fs.tellp()); - flow->setOffset(0); - if (flow->getResourceClaim() != nullptr) { - // Remove the old claim - flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); - flow->clearResourceClaim(); - } - flow->setResourceClaim(claim); - claim->increaseFlowFileRecordOwnedCount(); - - logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath().c_str(), - flow->getUUIDStr().c_str()); + content_stream->write(charBuffer.data(), read_size); + position += read_size; + } + // Open the source file and stream to the flow file - fs.close(); - std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr(); - uint64_t endTime = getTimeMillis(); - provenance_report_->modifyContent(flow, details, endTime - startTime); - } else { - fs.close(); - throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error"); - } - } else { - throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error"); + flow->setSize(content_stream->getSize()); + flow->setOffset(0); + if (flow->getResourceClaim() != nullptr) { + // Remove the old claim + flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); + flow->clearResourceClaim(); } + flow->setResourceClaim(claim); + + logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath().c_str(), + flow->getUUIDStr().c_str()); + + content_stream->closeStream(); + std::stringstream details; + details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr(); + uint64_t endTime = getTimeMillis(); + provenance_report_->modifyContent(flow, details.str(), endTime - startTime); } catch (std::exception &exception) { if (flow && flow->getResourceClaim() == claim) { flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); @@ -550,34 +527,44 @@ void ProcessSession::importFrom(io::DataStream &stream, std::shared_ptr<core::Fl } } -void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile> &flow, -bool keepSource, - uint64_t offset) { - std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(); +void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile> &flow, bool keepSource, uint64_t offset) { + std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository()); char *buf = NULL; int size = 4096; buf = new char[size]; try { - std::ofstream fs; + // std::ofstream fs; uint64_t startTime = getTimeMillis(); - fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc); std::ifstream input; input.open(source.c_str(), std::fstream::in | std::fstream::binary); - - if (fs.is_open() && input.is_open()) { + claim->increaseFlowFileRecordOwnedCount(); + std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim); + if (nullptr == stream) { + rollback(); + return; + } + if (input.is_open()) { // Open the source file and stream to the flow file - input.seekg(offset, fs.beg); + input.seekg(offset); + bool invalidWrite = false; while (input.good()) { input.read(buf, size); - if (input) - fs.write(buf, size); - else - fs.write(buf, input.gcount()); + if (input) { + if (stream->write(reinterpret_cast<uint8_t*>(buf), size) < 0) { + invalidWrite = true; + break; + } + } else { + if (stream->write(reinterpret_cast<uint8_t*>(buf), input.gcount()) < 0) { + invalidWrite = true; + break; + } + } } - if (fs.good() && fs.tellp() >= 0) { - flow->setSize(fs.tellp()); + if (!invalidWrite) { + flow->setSize(stream->getSize()); flow->setOffset(0); if (flow->getResourceClaim() != nullptr) { // Remove the old claim @@ -585,20 +572,20 @@ bool keepSource, flow->clearResourceClaim(); } flow->setResourceClaim(claim); - claim->increaseFlowFileRecordOwnedCount(); logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); - fs.close(); + stream->closeStream(); input.close(); if (!keepSource) std::remove(source.c_str()); - std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr(); + std::stringstream details; + details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr(); uint64_t endTime = getTimeMillis(); - provenance_report_->modifyContent(flow, details, endTime - startTime); + provenance_report_->modifyContent(flow, details.str(), endTime - startTime); } else { - fs.close(); + stream->closeStream(); input.close(); throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error"); } @@ -626,8 +613,7 @@ bool keepSource, } } -void ProcessSession::import(std::string source, std::vector<std::shared_ptr<FlowFileRecord>> flows, - bool keepSource, uint64_t offset, char inputDelimiter) { +void ProcessSession::import(std::string source, std::vector<std::shared_ptr<FlowFileRecord>> flows, bool keepSource, uint64_t offset, char inputDelimiter) { std::shared_ptr<ResourceClaim> claim; std::shared_ptr<FlowFileRecord> flowFile; @@ -639,48 +625,61 @@ void ProcessSession::import(std::string source, std::vector<std::shared_ptr<Flow try { // Open the input file and seek to the appropriate location. std::ifstream input; + logger_->log_debug("Opening %s", source); input.open(source.c_str(), std::fstream::in | std::fstream::binary); if (input.is_open()) { input.seekg(offset, input.beg); while (input.good()) { + bool invalidWrite = false; flowFile = std::static_pointer_cast<FlowFileRecord>(create()); - claim = std::make_shared<ResourceClaim>(); + claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository()); uint64_t startTime = getTimeMillis(); input.getline(buf, size, inputDelimiter); - std::ofstream fs; - fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc); - - if (fs.is_open()) { - if (input) - fs.write(buf, strlen(buf)); - else - fs.write(buf, input.gcount()); - - if (fs.good() && fs.tellp() >= 0) { - flowFile->setSize(fs.tellp()); - flowFile->setOffset(0); - if (flowFile->getResourceClaim() != nullptr) { - // Remove the old claim - flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); - flowFile->clearResourceClaim(); - } - flowFile->setResourceClaim(claim); - claim->increaseFlowFileRecordOwnedCount(); - logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flowFile->getOffset(), - flowFile->getSize(), flowFile->getResourceClaim()->getContentFullPath().c_str(), - flowFile->getUUIDStr().c_str()); + std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim); + if (nullptr == stream) { + logger_->log_debug("Stream is null"); + rollback(); + return; + } - fs.close(); - std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flowFile->getUUIDStr(); - uint64_t endTime = getTimeMillis(); - provenance_report_->modifyContent(flowFile, details, endTime - startTime); - flows.push_back(flowFile); + if (input) { + if (stream->write(reinterpret_cast<uint8_t*>(buf), size) < 0) { + invalidWrite = true; + break; + } + } else { + if (stream->write(reinterpret_cast<uint8_t*>(buf), input.gcount()) < 0) { + invalidWrite = true; + break; + } + } - } else { - fs.close(); - throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile"); + if (!invalidWrite) { + flowFile->setSize(stream->getSize()); + flowFile->setOffset(0); + if (flowFile->getResourceClaim() != nullptr) { + // Remove the old claim + flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); + flowFile->clearResourceClaim(); } + flowFile->setResourceClaim(claim); + claim->increaseFlowFileRecordOwnedCount(); + + logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flowFile->getOffset(), + flowFile->getSize(), + flowFile->getResourceClaim()->getContentFullPath().c_str(), + flowFile->getUUIDStr().c_str()); + + stream->closeStream(); + std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flowFile->getUUIDStr(); + uint64_t endTime = getTimeMillis(); + provenance_report_->modifyContent(flowFile, details, endTime - startTime); + flows.push_back(flowFile); + } else { + logger_->log_debug("Error while writing"); + stream->closeStream(); + throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile"); } } input.close(); @@ -711,35 +710,44 @@ void ProcessSession::import(std::string source, std::vector<std::shared_ptr<Flow } } -void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile> &&flow, -bool keepSource, - uint64_t offset) { - std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(); - +void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile> &&flow, bool keepSource, uint64_t offset) { + std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository()); char *buf = NULL; int size = 4096; buf = new char[size]; try { - std::ofstream fs; + // std::ofstream fs; uint64_t startTime = getTimeMillis(); - fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc); std::ifstream input; input.open(source.c_str(), std::fstream::in | std::fstream::binary); - - if (fs.is_open() && input.is_open()) { + claim->increaseFlowFileRecordOwnedCount(); + std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim); + if (nullptr == stream) { + rollback(); + return; + } + if (input.is_open()) { // Open the source file and stream to the flow file - input.seekg(offset, fs.beg); + input.seekg(offset); + int sizeWritten = 0; + bool invalidWrite = false; while (input.good()) { input.read(buf, size); - if (input) - fs.write(buf, size); - else - fs.write(buf, input.gcount()); + if (input) { + if (stream->write(reinterpret_cast<uint8_t*>(buf), size) < 0) { + invalidWrite = true; + break; + } + } else { + if (stream->write(reinterpret_cast<uint8_t*>(buf), input.gcount()) < 0) { + invalidWrite = true; + break; + } + } } - - if (fs.good() && fs.tellp() >= 0) { - flow->setSize(fs.tellp()); + if (!invalidWrite) { + flow->setSize(stream->getSize()); flow->setOffset(0); if (flow->getResourceClaim() != nullptr) { // Remove the old claim @@ -747,20 +755,20 @@ bool keepSource, flow->clearResourceClaim(); } flow->setResourceClaim(claim); - claim->increaseFlowFileRecordOwnedCount(); logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); - fs.close(); + stream->closeStream(); input.close(); if (!keepSource) std::remove(source.c_str()); - std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr(); + std::stringstream details; + details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr(); uint64_t endTime = getTimeMillis(); - provenance_report_->modifyContent(flow, details, endTime - startTime); + provenance_report_->modifyContent(flow, details.str(), endTime - startTime); } else { - fs.close(); + stream->closeStream(); input.close(); throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error"); } @@ -834,7 +842,7 @@ void ProcessSession::commit() { } } - // Do the samething for added flow file + // Do the same thing for added flow file for (const auto it : _addedFlowFiles) { std::shared_ptr<core::FlowFile> record = it.second; if (record->isDeleted()) @@ -851,6 +859,7 @@ void ProcessSession::commit() { std::string message = "Connect empty for non auto terminated relationship " + relationship.getName(); throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str()); } else { + logger_->log_debug("added flow file is auto terminated"); // Autoterminated remove(record); } @@ -947,7 +956,7 @@ void ProcessSession::rollback() { _addedFlowFiles.clear(); _updatedFlowFiles.clear(); _deletedFlowFiles.clear(); - logger_->log_trace("ProcessSession rollback for %s", process_context_->getProcessorNode().getName().c_str()); + logger_->log_debug("ProcessSession rollback for %s", process_context_->getProcessorNode().getName().c_str()); } catch (std::exception &exception) { logger_->log_debug("Caught Exception %s", exception.what()); throw; @@ -960,8 +969,10 @@ void ProcessSession::rollback() { std::shared_ptr<core::FlowFile> ProcessSession::get() { std::shared_ptr<Connectable> first = process_context_->getProcessorNode().getNextIncomingConnection(); - if (first == NULL) + if (first == NULL) { + logger_->log_debug("Get is null for %s", process_context_->getProcessorNode().getName()); return NULL; + } std::shared_ptr<Connection> current = std::static_pointer_cast<Connection>(first); @@ -972,8 +983,9 @@ std::shared_ptr<core::FlowFile> ProcessSession::get() { // Remove expired flow record for (std::set<std::shared_ptr<core::FlowFile> >::iterator it = expired.begin(); it != expired.end(); ++it) { std::shared_ptr<core::FlowFile> record = *it; - std::string details = process_context_->getProcessorNode().getName() + " expire flow record " + record->getUUIDStr(); - provenance_report_->expire(record, details); + std::stringstream details; + details << process_context_->getProcessorNode().getName() << " expire flow record " << record->getUUIDStr(); + provenance_report_->expire(record, details.str()); } } if (ret) { @@ -981,10 +993,9 @@ std::shared_ptr<core::FlowFile> ProcessSession::get() { ret->setDeleted(false); _updatedFlowFiles[ret->getUUIDStr()] = ret; std::map<std::string, std::string> empty; - std::shared_ptr<core::FlowFile> snapshot = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(), empty); + std::shared_ptr<core::FlowFile> snapshot = std::make_shared<FlowFileRecord>(process_context_->getFlowFileRepository(), process_context_->getContentRepository(), empty); logger_->log_debug("Create Snapshot FlowFile with UUID %s", snapshot->getUUIDStr().c_str()); snapshot = ret; -// snapshot->duplicate(ret); // save a snapshot _originalFlowFiles[snapshot->getUUIDStr()] = snapshot; return ret; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/core/ProcessSessionFactory.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ProcessSessionFactory.cpp b/libminifi/src/core/ProcessSessionFactory.cpp index 31b7481..570d895 100644 --- a/libminifi/src/core/ProcessSessionFactory.cpp +++ b/libminifi/src/core/ProcessSessionFactory.cpp @@ -28,7 +28,7 @@ namespace minifi { namespace core { std::unique_ptr<ProcessSession> ProcessSessionFactory::createSession() { - return std::unique_ptr<ProcessSession>(new ProcessSession(process_context_)); + return std::unique_ptr < ProcessSession > (new ProcessSession(process_context_)); } } /* namespace core */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/core/Processor.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp index 7b07638..0c2e7cf 100644 --- a/libminifi/src/core/Processor.cpp +++ b/libminifi/src/core/Processor.cpp @@ -62,7 +62,7 @@ Processor::Processor(std::string name, uuid_t uuid) active_tasks_ = 0; yield_expiration_ = 0; incoming_connections_Iter = this->_incomingConnections.begin(); - logger_->log_info("Processor %s created UUID %s", name_.c_str(), uuidStr_.c_str()); + logger_->log_info("Processor %s created UUID %s", name_, uuidStr_); } bool Processor::isRunning() { @@ -80,8 +80,8 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) { logger_->log_info("Can not add connection while the process %s is running", name_.c_str()); return false; } - std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn); - std::lock_guard<std::mutex> lock(mutex_); + std::shared_ptr<Connection> connection = std::static_pointer_cast < Connection > (conn); + std::lock_guard < std::mutex > lock(mutex_); uuid_t srcUUID; uuid_t destUUID; @@ -141,12 +141,12 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) { return; } - std::lock_guard<std::mutex> lock(mutex_); + std::lock_guard < std::mutex > lock(mutex_); uuid_t srcUUID; uuid_t destUUID; - std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn); + std::shared_ptr<Connection> connection = std::static_pointer_cast < Connection > (conn); connection->getSourceUUID(srcUUID); connection->getDestinationUUID(destUUID); @@ -178,13 +178,13 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) { } bool Processor::flowFilesQueued() { - std::lock_guard<std::mutex> lock(mutex_); + std::lock_guard < std::mutex > lock(mutex_); if (_incomingConnections.size() == 0) return false; for (auto &&conn : _incomingConnections) { - std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn); + std::shared_ptr<Connection> connection = std::static_pointer_cast < Connection > (conn); if (connection->getQueueSize() > 0) return true; } @@ -193,13 +193,13 @@ bool Processor::flowFilesQueued() { } bool Processor::flowFilesOutGoingFull() { - std::lock_guard<std::mutex> lock(mutex_); + std::lock_guard < std::mutex > lock(mutex_); for (auto &&connection : out_going_connections_) { // We already has connection for this relationship std::set<std::shared_ptr<Connectable>> existedConnection = connection.second; for (const auto conn : existedConnection) { - std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn); + std::shared_ptr < Connection > connection = std::static_pointer_cast < Connection > (conn); if (connection->isFull()) return true; } @@ -232,7 +232,7 @@ bool Processor::isWorkAvailable() { try { for (const auto &conn : _incomingConnections) { - std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn); + std::shared_ptr<Connection> connection = std::static_pointer_cast < Connection > (conn); if (connection->getQueueSize() > 0) { hasWork = true; break; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/core/Repository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Repository.cpp b/libminifi/src/core/Repository.cpp index 50e8cd2..cf26a0d 100644 --- a/libminifi/src/core/Repository.cpp +++ b/libminifi/src/core/Repository.cpp @@ -19,13 +19,14 @@ #include <arpa/inet.h> #include <cstdint> #include <vector> + +#include "../../include/core/repository/FlowFileRepository.h" #include "io/DataStream.h" #include "io/Serializable.h" #include "core/Relationship.h" #include "core/logging/Logger.h" #include "FlowController.h" #include "provenance/Provenance.h" -#include "core/repository/FlowFileRepository.h" namespace org { namespace apache { @@ -38,9 +39,8 @@ void Repository::start() { return; if (running_) return; - thread_ = std::thread(&Repository::threadExecutor, this); - thread_.detach(); running_ = true; + thread_ = std::thread(&Repository::threadExecutor, this); logger_->log_info("%s Repository Monitor Thread Start", name_.c_str()); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/core/RepositoryFactory.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/RepositoryFactory.cpp b/libminifi/src/core/RepositoryFactory.cpp index cf18601..9e99718 100644 --- a/libminifi/src/core/RepositoryFactory.cpp +++ b/libminifi/src/core/RepositoryFactory.cpp @@ -18,13 +18,17 @@ #include <memory> #include <string> #include <algorithm> +#include "core/ContentRepository.h" +#include "core/repository/FileSystemRepository.h" +#include "core/repository/VolatileContentRepository.h" #include "core/Repository.h" #ifdef LEVELDB_SUPPORT #include "core/repository/FlowFileRepository.h" #include "provenance/ProvenanceRepository.h" #endif -#include "core/repository/VolatileRepository.h" +#include "core/repository/VolatileProvenanceRepository.h" +#include "core/repository/VolatileFlowFileRepository.h" namespace org { namespace apache { @@ -48,14 +52,14 @@ std::shared_ptr<core::Repository> createRepository(const std::string configurati try { std::shared_ptr<core::Repository> return_obj = nullptr; if (class_name_lc == "flowfilerepository") { - std::cout << "creating flow" << std::endl; return_obj = instantiate<core::repository::FlowFileRepository>(repo_name); } else if (class_name_lc == "provenancerepository") { return_obj = instantiate<provenance::ProvenanceRepository>(repo_name); - } else if (class_name_lc == "volatilerepository") { - return_obj = instantiate<repository::VolatileRepository>(repo_name); + } else if (class_name_lc == "volatileflowfilerepository") { + return_obj = instantiate<repository::VolatileFlowFileRepository>(repo_name); + } else if (class_name_lc == "volatileprovenancefilerepository") { + return_obj = instantiate<repository::VolatileProvenanceRepository>(repo_name); } else if (class_name_lc == "nooprepository") { - std::cout << "creating noop" << std::endl; return_obj = instantiate<core::Repository>(repo_name); } @@ -63,13 +67,42 @@ std::shared_ptr<core::Repository> createRepository(const std::string configurati return return_obj; } if (fail_safe) { - return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, 1, 1); + return std::make_shared < core::Repository > ("fail_safe", "fail_safe", 1, 1, 1); } else { throw std::runtime_error("Support for the provided configuration class could not be found"); } } catch (const std::runtime_error &r) { if (fail_safe) { - return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, 1, 1); + return std::make_shared < core::Repository > ("fail_safe", "fail_safe", 1, 1, 1); + } + } + + throw std::runtime_error("Support for the provided configuration class could not be found"); +} + +std::shared_ptr<core::ContentRepository> createContentRepository(const std::string configuration_class_name, bool fail_safe, const std::string repo_name) { + std::shared_ptr<core::ContentRepository> return_obj = nullptr; + std::string class_name_lc = configuration_class_name; + std::transform(class_name_lc.begin(), class_name_lc.end(), class_name_lc.begin(), ::tolower); + try { + std::shared_ptr<core::ContentRepository> return_obj = nullptr; + if (class_name_lc == "volatilecontentrepository") { + return_obj = instantiate<core::repository::VolatileContentRepository>(repo_name); + } else { + return_obj = instantiate<core::repository::FileSystemRepository>(repo_name); + } + + if (return_obj) { + return return_obj; + } + if (fail_safe) { + return std::make_shared < core::repository::FileSystemRepository > ("fail_safe"); + } else { + throw std::runtime_error("Support for the provided configuration class could not be found"); + } + } catch (const std::runtime_error &r) { + if (fail_safe) { + return std::make_shared < core::repository::FileSystemRepository > ("fail_safe"); } } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/core/controller/StandardControllerServiceNode.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/controller/StandardControllerServiceNode.cpp b/libminifi/src/core/controller/StandardControllerServiceNode.cpp index 5c4aa70..69004c1 100644 --- a/libminifi/src/core/controller/StandardControllerServiceNode.cpp +++ b/libminifi/src/core/controller/StandardControllerServiceNode.cpp @@ -27,12 +27,12 @@ namespace minifi { namespace core { namespace controller { std::shared_ptr<core::ProcessGroup> &StandardControllerServiceNode::getProcessGroup() { - std::lock_guard<std::mutex> lock(mutex_); + std::lock_guard < std::mutex > lock(mutex_); return process_group_; } void StandardControllerServiceNode::setProcessGroup(std::shared_ptr<ProcessGroup> &processGroup) { - std::lock_guard<std::mutex> lock(mutex_); + std::lock_guard < std::mutex > lock(mutex_); process_group_ = processGroup; } @@ -45,7 +45,7 @@ bool StandardControllerServiceNode::enable() { for (auto linked_service : property.getValues()) { std::shared_ptr<ControllerServiceNode> csNode = provider->getControllerServiceNode(linked_service); if (nullptr != csNode) { - std::lock_guard<std::mutex> lock(mutex_); + std::lock_guard < std::mutex > lock(mutex_); linked_controller_services_.push_back(csNode); } } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/core/logging/LoggerConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/logging/LoggerConfiguration.cpp b/libminifi/src/core/logging/LoggerConfiguration.cpp index c06239b..4b97055 100644 --- a/libminifi/src/core/logging/LoggerConfiguration.cpp +++ b/libminifi/src/core/logging/LoggerConfiguration.cpp @@ -56,19 +56,19 @@ std::vector<std::string> LoggerProperties::get_keys_of_type(const std::string &t LoggerConfiguration::LoggerConfiguration() : root_namespace_(create_default_root()), loggers(std::vector<std::shared_ptr<LoggerImpl>>()), - formatter_(std::make_shared<spdlog::pattern_formatter>(spdlog_default_pattern)) { - logger_ = std::shared_ptr<LoggerImpl>(new LoggerImpl(core::getClassName<LoggerConfiguration>(), get_logger(nullptr, root_namespace_, core::getClassName<LoggerConfiguration>(), formatter_))); + formatter_(std::make_shared < spdlog::pattern_formatter > (spdlog_default_pattern)) { + logger_ = std::shared_ptr < LoggerImpl > (new LoggerImpl(core::getClassName<LoggerConfiguration>(), get_logger(nullptr, root_namespace_, core::getClassName<LoggerConfiguration>(), formatter_))); loggers.push_back(logger_); } void LoggerConfiguration::initialize(const std::shared_ptr<LoggerProperties> &logger_properties) { - std::lock_guard<std::mutex> lock(mutex); + std::lock_guard < std::mutex > lock(mutex); root_namespace_ = initialize_namespaces(logger_properties); std::string spdlog_pattern; if (!logger_properties->get("spdlog.pattern", spdlog_pattern)) { spdlog_pattern = spdlog_default_pattern; } - formatter_ = std::make_shared<spdlog::pattern_formatter>(spdlog_pattern); + formatter_ = std::make_shared < spdlog::pattern_formatter > (spdlog_pattern); std::map<std::string, std::shared_ptr<spdlog::logger>> spdloggers; for (auto const & logger_impl : loggers) { std::shared_ptr<spdlog::logger> spdlogger; @@ -85,8 +85,8 @@ void LoggerConfiguration::initialize(const std::shared_ptr<LoggerProperties> &lo } std::shared_ptr<Logger> LoggerConfiguration::getLogger(const std::string &name) { - std::lock_guard<std::mutex> lock(mutex); - std::shared_ptr<LoggerImpl> result = std::make_shared<LoggerImpl>(name, get_logger(logger_, root_namespace_, name, formatter_)); + std::lock_guard < std::mutex > lock(mutex); + std::shared_ptr<LoggerImpl> result = std::make_shared < LoggerImpl > (name, get_logger(logger_, root_namespace_, name, formatter_)); loggers.push_back(result); return result; } @@ -130,7 +130,7 @@ std::shared_ptr<internal::LoggerNamespace> LoggerConfiguration::initialize_names } catch (const std::out_of_range &oor) { } } - sink_map[appender_name] = std::make_shared<spdlog::sinks::rotating_file_sink_mt>(file_name, max_file_size, max_files); + sink_map[appender_name] = std::make_shared < spdlog::sinks::rotating_file_sink_mt > (file_name, max_file_size, max_files); } else if ("stdout" == appender_type) { sink_map[appender_name] = spdlog::sinks::stdout_sink_mt::instance(); } else { @@ -227,7 +227,7 @@ std::shared_ptr<spdlog::logger> LoggerConfiguration::get_logger(std::shared_ptr< if (logger != nullptr) { logger->log_debug("%s logger got sinks from namespace %s and level %s from namespace %s", name, sink_namespace_str, spdlog::level::level_names[level], level_namespace_str); } - spdlogger = std::make_shared<spdlog::logger>(name, begin(sinks), end(sinks)); + spdlogger = std::make_shared < spdlog::logger > (name, begin(sinks), end(sinks)); spdlogger->set_level(level); spdlogger->set_formatter(formatter); spdlogger->flush_on(std::max(spdlog::level::info, current_namespace->level)); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp index 02ddb52..d4059d6 100644 --- a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp +++ b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp @@ -24,8 +24,10 @@ #include <string> #include <memory> #include <sstream> +#include <functional> #include <iostream> #include <utility> +#include "core/Repository.h" #include "core/reporting/SiteToSiteProvenanceReportingTask.h" #include "../include/io/StreamFactory.h" #include "io/ClientSocket.h" @@ -51,10 +53,14 @@ void SiteToSiteProvenanceReportingTask::initialize() { RemoteProcessorGroupPort::initialize(); } -void SiteToSiteProvenanceReportingTask::getJsonReport(core::ProcessContext *context, core::ProcessSession *session, std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> &records, +void SiteToSiteProvenanceReportingTask::getJsonReport(core::ProcessContext *context, core::ProcessSession *session, std::vector<std::shared_ptr<core::SerializableComponent>> &records, std::string &report) { Json::Value array; - for (auto record : records) { + for (auto sercomp : records) { + std::shared_ptr<provenance::ProvenanceEventRecord> record = std::dynamic_pointer_cast < provenance::ProvenanceEventRecord > (sercomp); + if (nullptr == record) { + break; + } Json::Value recordJson; Json::Value updatedAttributesJson; Json::Value parentUuidJson; @@ -108,23 +114,32 @@ void SiteToSiteProvenanceReportingTask::onTrigger(core::ProcessContext *context, return; } + logger_->log_debug("SiteToSiteProvenanceReportingTask -- onTrigger"); + if (!protocol_->bootstrap()) { // bootstrap the client protocol if needeed context->yield(); - std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>(context->getProcessorNode().getProcessor()); + std::shared_ptr<Processor> processor = std::static_pointer_cast < Processor > (context->getProcessorNode().getProcessor()); logger_->log_error("Site2Site bootstrap failed yield period %d peer ", processor->getYieldPeriodMsec()); returnProtocol(std::move(protocol_)); return; } - std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> records; - std::shared_ptr<provenance::ProvenanceRepository> repo = std::static_pointer_cast<provenance::ProvenanceRepository>(context->getProvenanceRepository()); - repo->getProvenanceRecord(records, batch_size_); - if (records.size() <= 0) { + std::vector<std::shared_ptr<core::SerializableComponent>> records; + + logger_->log_debug("batch size %d records", batch_size_); + size_t deserialized = batch_size_; + std::shared_ptr<core::Repository> repo = context->getProvenanceRepository(); + std::function < std::shared_ptr<core::SerializableComponent>() > constructor = []() {return std::make_shared<provenance::ProvenanceEventRecord>();}; + if (!repo->DeSerialize(records, deserialized, constructor) && deserialized == 0) { + logger_->log_debug("Not sending because deserialized is %d", deserialized); returnProtocol(std::move(protocol_)); return; } + logger_->log_debug("batch size %d records", batch_size_, deserialized); + + logger_->log_debug("Captured %d records", deserialized); std::string jsonStr; this->getJsonReport(context, session, records, jsonStr); if (jsonStr.length() <= 0) { @@ -141,7 +156,7 @@ void SiteToSiteProvenanceReportingTask::onTrigger(core::ProcessContext *context, } // we transfer the record, purge the record from DB - repo->purgeProvenanceRecord(records); + repo->Delete(records); returnProtocol(std::move(protocol_)); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/core/repository/FileSystemRepository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/repository/FileSystemRepository.cpp b/libminifi/src/core/repository/FileSystemRepository.cpp new file mode 100644 index 0000000..fba1fe3 --- /dev/null +++ b/libminifi/src/core/repository/FileSystemRepository.cpp @@ -0,0 +1,54 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "core/repository/FileSystemRepository.h" +#include <memory> +#include "io/FileStream.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace repository { + +bool FileSystemRepository::initialize(const std::shared_ptr<minifi::Configure> &configuration) { + return true; +} +void FileSystemRepository::stop() { +} + +std::shared_ptr<io::BaseStream> FileSystemRepository::write(const std::shared_ptr<minifi::ResourceClaim> &claim) { + return std::make_shared<io::FileStream>(claim->getContentFullPath()); +} + +std::shared_ptr<io::BaseStream> FileSystemRepository::read(const std::shared_ptr<minifi::ResourceClaim> &claim) { + return std::make_shared<io::FileStream>(claim->getContentFullPath(), 0, false); +} + +bool FileSystemRepository::remove(const std::shared_ptr<minifi::ResourceClaim> &claim) { + std::remove(claim->getContentFullPath().c_str()); + return true; +} + +} /* namespace repository */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */
