http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/provenance/Provenance.h ---------------------------------------------------------------------- diff --git a/libminifi/include/provenance/Provenance.h b/libminifi/include/provenance/Provenance.h new file mode 100644 index 0000000..3d5d19e --- /dev/null +++ b/libminifi/include/provenance/Provenance.h @@ -0,0 +1,560 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __PROVENANCE_H__ +#define __PROVENANCE_H__ + +#include <ftw.h> +#include <uuid/uuid.h> +#include <atomic> +#include <cstdint> +#include <cstring> +#include <iostream> +#include <map> +#include <set> +#include <string> +#include <thread> +#include <vector> + + +#include "core/Repository.h" +#include "core/Property.h" +#include "properties/Configure.h" +#include "Connection.h" +#include "FlowFileRecord.h" +#include "core/logging/Logger.h" +#include "ResourceClaim.h" +#include "io/Serializable.h" +#include "utils/TimeUtil.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace provenance { +// Provenance Event Record Serialization Seg Size +#define PROVENANCE_EVENT_RECORD_SEG_SIZE 2048 + +// Provenance Event Record +class ProvenanceEventRecord : + protected org::apache::nifi::minifi::io::Serializable { + public: + enum ProvenanceEventType { + + /** + * A CREATE event is used when a FlowFile is generated from data that was + * not received from a remote system or external process + */ + CREATE, + + /** + * Indicates a provenance event for receiving data from an external process. This Event Type + * is expected to be the first event for a FlowFile. As such, a Processor that receives data + * from an external source and uses that data to replace the content of an existing FlowFile + * should use the {@link #FETCH} event type, rather than the RECEIVE event type. + */ + RECEIVE, + + /** + * Indicates that the contents of a FlowFile were overwritten using the contents of some + * external resource. This is similar to the {@link #RECEIVE} event but varies in that + * RECEIVE events are intended to be used as the event that introduces the FlowFile into + * the system, whereas FETCH is used to indicate that the contents of an existing FlowFile + * were overwritten. + */ + FETCH, + + /** + * Indicates a provenance event for sending data to an external process + */ + SEND, + + /** + * Indicates that the contents of a FlowFile were downloaded by a user or external entity. + */ + DOWNLOAD, + + /** + * Indicates a provenance event for the conclusion of an object's life for + * some reason other than object expiration + */ + DROP, + + /** + * Indicates a provenance event for the conclusion of an object's life due + * to the fact that the object could not be processed in a timely manner + */ + EXPIRE, + + /** + * FORK is used to indicate that one or more FlowFile was derived from a + * parent FlowFile. + */ + FORK, + + /** + * JOIN is used to indicate that a single FlowFile is derived from joining + * together multiple parent FlowFiles. + */ + JOIN, + + /** + * CLONE is used to indicate that a FlowFile is an exact duplicate of its + * parent FlowFile. + */ + CLONE, + + /** + * CONTENT_MODIFIED is used to indicate that a FlowFile's content was + * modified in some way. When using this Event Type, it is advisable to + * provide details about how the content is modified. + */ + CONTENT_MODIFIED, + + /** + * ATTRIBUTES_MODIFIED is used to indicate that a FlowFile's attributes were + * modified in some way. This event is not needed when another event is + * reported at the same time, as the other event will already contain all + * FlowFile attributes. + */ + ATTRIBUTES_MODIFIED, + + /** + * ROUTE is used to show that a FlowFile was routed to a specified + * {@link org.apache.nifi.processor.Relationship Relationship} and should provide + * information about why the FlowFile was routed to this relationship. + */ + ROUTE, + + /** + * Indicates a provenance event for adding additional information such as a + * new linkage to a new URI or UUID + */ + ADDINFO, + + /** + * Indicates a provenance event for replaying a FlowFile. The UUID of the + * event will indicate the UUID of the original FlowFile that is being + * replayed. The event will contain exactly one Parent UUID that is also the + * UUID of the FlowFile that is being replayed and exactly one Child UUID + * that is the UUID of the a newly created FlowFile that will be re-queued + * for processing. + */ + REPLAY + }; + public: + // Constructor + /*! + * Create a new provenance event record + */ + ProvenanceEventRecord(ProvenanceEventType event, std::string componentId, + std::string componentType) { + _eventType = event; + _componentId = componentId; + _componentType = componentType; + _eventTime = getTimeMillis(); + char eventIdStr[37]; + // Generate the global UUID for th event + uuid_generate(_eventId); + uuid_unparse_lower(_eventId, eventIdStr); + _eventIdStr = eventIdStr; + logger_ = logging::Logger::getLogger(); + } + + ProvenanceEventRecord() { + _eventTime = getTimeMillis(); + logger_ = logging::Logger::getLogger(); + } + + // Destructor + virtual ~ProvenanceEventRecord() { + } + // Get the Event ID + std::string getEventId() { + return _eventIdStr; + } + // Get Attributes + std::map<std::string, std::string> getAttributes() { + return _attributes; + } + // Get Size + uint64_t getFileSize() { + return _size; + } + // ! Get Offset + uint64_t getFileOffset() { + return _offset; + } + // ! Get Entry Date + uint64_t getFlowFileEntryDate() { + return _entryDate; + } + // ! Get Lineage Start Date + uint64_t getlineageStartDate() { + return _lineageStartDate; + } + // ! Get Event Time + uint64_t getEventTime() { + return _eventTime; + } + // ! Get Event Duration + uint64_t getEventDuration() { + return _eventDuration; + } + // Set Event Duration + void setEventDuration(uint64_t duration) { + _eventDuration = duration; + } + // ! Get Event Type + ProvenanceEventType getEventType() { + return _eventType; + } + // Get Component ID + std::string getComponentId() { + return _componentId; + } + // Get Component Type + std::string getComponentType() { + return _componentType; + } + // Get FlowFileUuid + std::string getFlowFileUuid() { + return uuid_; + } + // Get content full path + std::string getContentFullPath() { + return _contentFullPath; + } + // Get LineageIdentifiers + std::set<std::string> getLineageIdentifiers() { + return _lineageIdentifiers; + } + // Get Details + std::string getDetails() { + return _details; + } + // Set Details + void setDetails(std::string details) { + _details = details; + } + // Get TransitUri + std::string getTransitUri() { + return _transitUri; + } + // Set TransitUri + void setTransitUri(std::string uri) { + _transitUri = uri; + } + // Get SourceSystemFlowFileIdentifier + std::string getSourceSystemFlowFileIdentifier() { + return _sourceSystemFlowFileIdentifier; + } + // Set SourceSystemFlowFileIdentifier + void setSourceSystemFlowFileIdentifier(std::string identifier) { + _sourceSystemFlowFileIdentifier = identifier; + } + // Get Parent UUIDs + std::vector<std::string> getParentUuids() { + return _parentUuids; + } + // Add Parent UUID + void addParentUuid(std::string uuid) { + if (std::find(_parentUuids.begin(), _parentUuids.end(), uuid) + != _parentUuids.end()) + return; + else + _parentUuids.push_back(uuid); + } + // Add Parent Flow File + void addParentFlowFile(std::shared_ptr<core::FlowFile> flow) { + addParentUuid(flow->getUUIDStr()); + return; + } + // Remove Parent UUID + void removeParentUuid(std::string uuid) { + _parentUuids.erase( + std::remove(_parentUuids.begin(), _parentUuids.end(), uuid), + _parentUuids.end()); + } + // Remove Parent Flow File + void removeParentFlowFile(std::shared_ptr<core::FlowFile> flow) { + removeParentUuid(flow->getUUIDStr()); + return; + } + // Get Children UUIDs + std::vector<std::string> getChildrenUuids() { + return _childrenUuids; + } + // Add Child UUID + void addChildUuid(std::string uuid) { + if (std::find(_childrenUuids.begin(), _childrenUuids.end(), uuid) + != _childrenUuids.end()) + return; + else + _childrenUuids.push_back(uuid); + } + // Add Child Flow File + void addChildFlowFile(std::shared_ptr<core::FlowFile> flow) { + addChildUuid(flow->getUUIDStr()); + return; + } + // Remove Child UUID + void removeChildUuid(std::string uuid) { + _childrenUuids.erase( + std::remove(_childrenUuids.begin(), _childrenUuids.end(), uuid), + _childrenUuids.end()); + } + // Remove Child Flow File + void removeChildFlowFile(std::shared_ptr<core::FlowFile> flow) { + removeChildUuid(flow->getUUIDStr()); + return; + } + // Get AlternateIdentifierUri + std::string getAlternateIdentifierUri() { + return _alternateIdentifierUri; + } + // Set AlternateIdentifierUri + void setAlternateIdentifierUri(std::string uri) { + _alternateIdentifierUri = uri; + } + // Get Relationship + std::string getRelationship() { + return _relationship; + } + // Set Relationship + void setRelationship(std::string relation) { + _relationship = relation; + } + // Get sourceQueueIdentifier + std::string getSourceQueueIdentifier() { + return _sourceQueueIdentifier; + } + // Set sourceQueueIdentifier + void setSourceQueueIdentifier(std::string identifier) { + _sourceQueueIdentifier = identifier; + } + // fromFlowFile + void fromFlowFile(std::shared_ptr<core::FlowFile> &flow) { + _entryDate = flow->getEntryDate(); + _lineageStartDate = flow->getlineageStartDate(); + _lineageIdentifiers = flow->getlineageIdentifiers(); + uuid_ = flow->getUUIDStr(); + _attributes = flow->getAttributes(); + _size = flow->getSize(); + _offset = flow->getOffset(); + if (flow->getOriginalConnection()) + _sourceQueueIdentifier = flow->getOriginalConnection()->getName(); + if (flow->getResourceClaim()) { + _contentFullPath = flow->getResourceClaim()->getContentFullPath(); + } + } + // Serialize and Persistent to the repository + bool Serialize(const std::shared_ptr<core::Repository> &repo); + // DeSerialize + bool DeSerialize(const uint8_t *buffer, const int bufferSize); + // DeSerialize + bool DeSerialize(org::apache::nifi::minifi::io::DataStream &stream) { + return DeSerialize(stream.getBuffer(), stream.getSize()); + } + // DeSerialize + bool DeSerialize(const std::shared_ptr<core::Repository> &repo, + std::string key); + + protected: + + // Event type + ProvenanceEventType _eventType; + // Date at which the event was created + uint64_t _eventTime; + // Date at which the flow file entered the flow + uint64_t _entryDate; + // Date at which the origin of this flow file entered the flow + uint64_t _lineageStartDate; + // Event Duration + uint64_t _eventDuration; + // Component ID + std::string _componentId; + // Component Type + std::string _componentType; + // Size in bytes of the data corresponding to this flow file + uint64_t _size; + // flow uuid + std::string uuid_; + // Offset to the content + uint64_t _offset; + // Full path to the content + std::string _contentFullPath; + // Attributes key/values pairs for the flow record + std::map<std::string, std::string> _attributes; + // provenance ID + uuid_t _eventId; + // UUID string for all parents + std::set<std::string> _lineageIdentifiers; + // transitUri + std::string _transitUri; + // sourceSystemFlowFileIdentifier + std::string _sourceSystemFlowFileIdentifier; + // parent UUID + std::vector<std::string> _parentUuids; + // child UUID + std::vector<std::string> _childrenUuids; + // detail + std::string _details; + // sourceQueueIdentifier + std::string _sourceQueueIdentifier; + // event ID Str + std::string _eventIdStr; + // relationship + std::string _relationship; + // alternateIdentifierUri; + std::string _alternateIdentifierUri; + + private: + + // Logger + std::shared_ptr<logging::Logger> logger_; + + // Prevent default copy constructor and assignment operation + // Only support pass by reference or pointer + ProvenanceEventRecord(const ProvenanceEventRecord &parent); + ProvenanceEventRecord &operator=(const ProvenanceEventRecord &parent); + +}; + +// Provenance Reporter +class ProvenanceReporter { + public: + // Constructor + /*! + * Create a new provenance reporter associated with the process session + */ + ProvenanceReporter(std::shared_ptr<core::Repository> repo, + std::string componentId, std::string componentType) { + logger_ = logging::Logger::getLogger(); + _componentId = componentId; + _componentType = componentType; + repo_ = repo; + } + + // Destructor + virtual ~ProvenanceReporter() { + clear(); + } + // Get events + std::set<ProvenanceEventRecord *> getEvents() { + return _events; + } + // Add event + void add(ProvenanceEventRecord *event) { + _events.insert(event); + } + // Remove event + void remove(ProvenanceEventRecord *event) { + if (_events.find(event) != _events.end()) { + _events.erase(event); + } + } + // + // clear + void clear() { + for (auto it : _events) { + delete it; + } + _events.clear(); + } + // allocate + ProvenanceEventRecord *allocate( + ProvenanceEventRecord::ProvenanceEventType eventType, + std::shared_ptr<core::FlowFile> flow) { + ProvenanceEventRecord *event = new ProvenanceEventRecord(eventType, + _componentId, + _componentType); + if (event) + event->fromFlowFile(flow); + + return event; + } + // commit + void commit(); + // create + void create(std::shared_ptr<core::FlowFile> flow, std::string detail); + // route + void route(std::shared_ptr<core::FlowFile> flow, core::Relationship relation, + std::string detail, uint64_t processingDuration); + // modifyAttributes + void modifyAttributes(std::shared_ptr<core::FlowFile> flow, + std::string detail); + // modifyContent + void modifyContent(std::shared_ptr<core::FlowFile> flow, std::string detail, + uint64_t processingDuration); + // clone + void clone(std::shared_ptr<core::FlowFile> parent, + std::shared_ptr<core::FlowFile> child); + // join + void join(std::vector<std::shared_ptr<core::FlowFile> > parents, + std::shared_ptr<core::FlowFile> child, std::string detail, + uint64_t processingDuration); + // fork + void fork(std::vector<std::shared_ptr<core::FlowFile> > child, + std::shared_ptr<core::FlowFile> parent, std::string detail, + uint64_t processingDuration); + // expire + void expire(std::shared_ptr<core::FlowFile> flow, std::string detail); + // drop + void drop(std::shared_ptr<core::FlowFile> flow, std::string reason); + // send + void send(std::shared_ptr<core::FlowFile> flow, std::string transitUri, + std::string detail, uint64_t processingDuration, bool force); + // fetch + void fetch(std::shared_ptr<core::FlowFile> flow, std::string transitUri, + std::string detail, uint64_t processingDuration); + // receive + void receive(std::shared_ptr<core::FlowFile> flow, std::string transitUri, + std::string sourceSystemFlowFileIdentifier, std::string detail, + uint64_t processingDuration); + + protected: + + // Component ID + std::string _componentId; + // Component Type + std::string _componentType; + + private: + + // Incoming connection Iterator + std::set<ProvenanceEventRecord *> _events; + // Logger + std::shared_ptr<logging::Logger> logger_; + // provenance repository. + std::shared_ptr<core::Repository> repo_; + + // Prevent default copy constructor and assignment operation + // Only support pass by reference or pointer + ProvenanceReporter(const ProvenanceReporter &parent); + ProvenanceReporter &operator=(const ProvenanceReporter &parent); +}; + +// Provenance Repository + +} /* namespace provenance */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/provenance/ProvenanceRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/provenance/ProvenanceRepository.h b/libminifi/include/provenance/ProvenanceRepository.h new file mode 100644 index 0000000..0f8ee5d --- /dev/null +++ b/libminifi/include/provenance/ProvenanceRepository.h @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_PROVENANCE_PROVENANCEREPOSITORY_H_ +#define LIBMINIFI_INCLUDE_PROVENANCE_PROVENANCEREPOSITORY_H_ + +#include "leveldb/db.h" +#include "leveldb/options.h" +#include "leveldb/slice.h" +#include "leveldb/status.h" +#include "core/Repository.h" +#include "core/core.h" +#include "provenance/Provenance.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace provenance { + +#define PROVENANCE_DIRECTORY "./provenance_repository" +#define MAX_PROVENANCE_STORAGE_SIZE (10*1024*1024) // 10M +#define MAX_PROVENANCE_ENTRY_LIFE_TIME (60000) // 1 minute +#define PROVENANCE_PURGE_PERIOD (2500) // 2500 msec + +class ProvenanceRepository : public core::Repository, + public std::enable_shared_from_this<ProvenanceRepository> { + public: + // Constructor + /*! + * Create a new provenance repository + */ + ProvenanceRepository(std::string directory = PROVENANCE_DIRECTORY, + int64_t maxPartitionMillis = + MAX_PROVENANCE_ENTRY_LIFE_TIME, + int64_t maxPartitionBytes = MAX_PROVENANCE_STORAGE_SIZE, + uint64_t purgePeriod = PROVENANCE_PURGE_PERIOD) + : Repository(core::getClassName<ProvenanceRepository>(), directory, + maxPartitionMillis, maxPartitionBytes, purgePeriod) { + + db_ = NULL; + } + + // Destructor + virtual ~ProvenanceRepository() { + if (db_) + delete db_; + } + + // initialize + virtual bool initialize() { + std::string value; + if (configure_->get(Configure::nifi_provenance_repository_directory_default, + value)) { + directory_ = value; + } + logger_->log_info("NiFi Provenance Repository Directory %s", + directory_.c_str()); + if (configure_->get(Configure::nifi_provenance_repository_max_storage_size, + value)) { + core::Property::StringToInt(value, max_partition_bytes_); + } + logger_->log_info("NiFi Provenance Max Partition Bytes %d", + max_partition_bytes_); + if (configure_->get(Configure::nifi_provenance_repository_max_storage_time, + value)) { + core::TimeUnit unit; + if (core::Property::StringToTime(value, max_partition_millis_, unit) + && core::Property::ConvertTimeUnitToMS(max_partition_millis_, unit, + max_partition_millis_)) { + } + } + logger_->log_info("NiFi Provenance Max Storage Time: [%d] ms", + max_partition_millis_); + leveldb::Options options; + options.create_if_missing = true; + leveldb::Status status = leveldb::DB::Open(options, directory_.c_str(), + &db_); + if (status.ok()) { + logger_->log_info("NiFi Provenance Repository database open %s success", + directory_.c_str()); + } else { + logger_->log_error("NiFi Provenance Repository database open %s fail", + directory_.c_str()); + return false; + } + + return true; + } + // Put + virtual bool Put(std::string key, uint8_t *buf, int bufLen) { + + // persistent to the DB + leveldb::Slice value((const char *) buf, bufLen); + leveldb::Status status; + status = db_->Put(leveldb::WriteOptions(), key, value); + if (status.ok()) + return true; + else + return false; + } + // Delete + virtual bool Delete(std::string key) { + leveldb::Status status; + status = db_->Delete(leveldb::WriteOptions(), key); + if (status.ok()) + return true; + else + return false; + } + // Get + virtual bool Get(std::string key, std::string &value) { + leveldb::Status status; + status = db_->Get(leveldb::ReadOptions(), key, &value); + if (status.ok()) + return true; + else + return false; + } + // Persistent event + void registerEvent(std::shared_ptr<ProvenanceEventRecord> &event) { + event->Serialize( + std::static_pointer_cast<core::Repository>(shared_from_this())); + } + // Remove event + void removeEvent(ProvenanceEventRecord *event) { + Delete(event->getEventId()); + } + // destroy + void destroy() { + if (db_) { + delete db_; + db_ = NULL; + } + } + // Run function for the thread + void run(); + + // Prevent default copy constructor and assignment operation + // Only support pass by reference or pointer + ProvenanceRepository(const ProvenanceRepository &parent) = delete; + ProvenanceRepository &operator=(const ProvenanceRepository &parent) = delete; + + private: + leveldb::DB* db_; + +}; + +} /* namespace provenance */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ +#endif /* LIBMINIFI_INCLUDE_PROVENANCE_PROVENANCEREPOSITORY_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/utils/FailurePolicy.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/FailurePolicy.h b/libminifi/include/utils/FailurePolicy.h index a4a7f9e..98ec18a 100644 --- a/libminifi/include/utils/FailurePolicy.h +++ b/libminifi/include/utils/FailurePolicy.h @@ -17,6 +17,12 @@ #ifndef LIBMINIFI_INCLUDE_UTILS_FAILUREPOLICY_H_ #define LIBMINIFI_INCLUDE_UTILS_FAILUREPOLICY_H_ +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + /** * Basic failure policy enumeration * @@ -42,4 +48,11 @@ enum FailurePolicy { EXIT }; +} /* namespace utils */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + + #endif /* LIBMINIFI_INCLUDE_UTILS_FAILUREPOLICY_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/utils/StringUtils.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/StringUtils.h b/libminifi/include/utils/StringUtils.h index 30858c8..82459db 100644 --- a/libminifi/include/utils/StringUtils.h +++ b/libminifi/include/utils/StringUtils.h @@ -19,7 +19,13 @@ #include <algorithm> #include <sstream> -#include "../utils/FailurePolicy.h" +#include "utils/FailurePolicy.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { /** * Stateless String utility class. @@ -122,4 +128,11 @@ public: }; +} /* namespace utils */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + + #endif /* LIBMINIFI_INCLUDE_IO_STRINGUTILS_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/AppendHostInfo.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/AppendHostInfo.cpp b/libminifi/src/AppendHostInfo.cpp deleted file mode 100644 index d0769c1..0000000 --- a/libminifi/src/AppendHostInfo.cpp +++ /dev/null @@ -1,97 +0,0 @@ -/** - * @file AppendHostInfo.cpp - * AppendHostInfo class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <set> -#include <sys/time.h> -#include <string.h> -#include "AppendHostInfo.h" -#include "ProcessContext.h" -#include "ProcessSession.h" - -#include <netdb.h> -#include <netinet/in.h> -#include <sys/socket.h> -#include <sys/ioctl.h> -#include <net/if.h> -#include <arpa/inet.h> - -#include "io/ClientSocket.h" - -#define __USE_POSIX -#include <limits.h> - -#ifndef HOST_NAME_MAX -#define HOST_NAME_MAX 255 -#endif - -const std::string AppendHostInfo::ProcessorName("AppendHostInfo"); -Property AppendHostInfo::InterfaceName("Network Interface Name", "Network interface from which to read an IP v4 address", "eth0"); -Property AppendHostInfo::HostAttribute("Hostname Attribute", "Flowfile attribute to used to record the agent's hostname", "source.hostname"); -Property AppendHostInfo::IPAttribute("IP Attribute", "Flowfile attribute to used to record the agent's IP address", "source.ipv4"); -Relationship AppendHostInfo::Success("success", "success operational on the flow record"); - -void AppendHostInfo::initialize() -{ - //! Set the supported properties - std::set<Property> properties; - properties.insert(InterfaceName); - properties.insert(HostAttribute); - properties.insert(IPAttribute); - setSupportedProperties(properties); - - //! Set the supported relationships - std::set<Relationship> relationships; - relationships.insert(Success); - setSupportedRelationships(relationships); -} - -void AppendHostInfo::onTrigger(ProcessContext *context, ProcessSession *session) -{ - FlowFileRecord *flow = session->get(); - if (!flow) - return; - - //Get Hostname - - std::string hostAttribute = ""; - context->getProperty(HostAttribute.getName(), hostAttribute); - flow->addAttribute(hostAttribute.c_str(), Socket::getMyHostName()); - - //Get IP address for the specified interface - std::string iface; - context->getProperty(InterfaceName.getName(), iface); - //Confirm the specified interface name exists on this device - if (if_nametoindex(iface.c_str()) != 0){ - struct ifreq ifr; - int fd = socket(AF_INET, SOCK_DGRAM, 0); - //Type of address to retrieve - IPv4 IP address - ifr.ifr_addr.sa_family = AF_INET; - //Copy the interface name in the ifreq structure - strncpy(ifr.ifr_name , iface.c_str(), IFNAMSIZ-1); - ioctl(fd, SIOCGIFADDR, &ifr); - close(fd); - - std::string ipAttribute; - context->getProperty(IPAttribute.getName(), ipAttribute); - flow->addAttribute(ipAttribute.c_str(), inet_ntoa(((struct sockaddr_in *)&ifr.ifr_addr)->sin_addr)); - } - - // Transfer to the relationship - session->transfer(flow, Success); -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/BaseLogger.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/BaseLogger.cpp b/libminifi/src/BaseLogger.cpp deleted file mode 100644 index 1b3b2fd..0000000 --- a/libminifi/src/BaseLogger.cpp +++ /dev/null @@ -1,153 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "BaseLogger.h" - -// Logger related configuration items. -const char *BaseLogger::nifi_log_level = "nifi.log.level"; -const char *BaseLogger::nifi_log_appender = "nifi.log.appender"; - -/** - * @brief Log error message - * @param format format string ('man printf' for syntax) - * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match - */ - void BaseLogger::log_error(const char * const format, ...) { - if (logger_ == NULL || !logger_->should_log(spdlog::level::level_enum::err)) - return; - FILL_BUFFER - log_str(err,buffer); -} -/** - * @brief Log warn message - * @param format format string ('man printf' for syntax) - * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match - */ - void BaseLogger::log_warn(const char * const format, ...) { - if (logger_ == NULL - || !logger_->should_log(spdlog::level::level_enum::warn)) - return; - FILL_BUFFER - log_str(warn,buffer); -} -/** - * @brief Log info message - * @param format format string ('man printf' for syntax) - * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match - */ - void BaseLogger::log_info(const char * const format, ...) { - if (logger_ == NULL - || !logger_->should_log(spdlog::level::level_enum::info)) - return; - FILL_BUFFER - log_str(info,buffer); -} -/** - * @brief Log debug message - * @param format format string ('man printf' for syntax) - * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match - */ - void BaseLogger::log_debug(const char * const format, ...) { - - if (logger_ == NULL - || !logger_->should_log(spdlog::level::level_enum::debug)) - return; - FILL_BUFFER - log_str(debug,buffer); -} -/** - * @brief Log trace message - * @param format format string ('man printf' for syntax) - * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match - */ - void BaseLogger::log_trace(const char * const format, ...) { - - if (logger_ == NULL - || !logger_->should_log(spdlog::level::level_enum::trace)) - return; - FILL_BUFFER - log_str(debug,buffer); -} - -// overridables - -/** - * @brief Log error message - * @param format format string ('man printf' for syntax) - * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match - */ -void BaseLogger::log_str(LOG_LEVEL_E level, const std::string &buffer) { - switch (level) { - case err: - case critical: - if (stderr_ != nullptr) { - stderr_->error(buffer); - } else { - logger_->error(buffer); - } - break; - case warn: - logger_->warn(buffer); - break; - case info: - logger_->info(buffer); - break; - case debug: - logger_->debug(buffer); - break; - case trace: - logger_->trace(buffer); - break; - case off: - break; - default: - logger_->info(buffer); - break; - } - -} - -void BaseLogger::setLogLevel(const std::string &level, - LOG_LEVEL_E defaultLevel) { - std::string logLevel = level; - std::transform(logLevel.begin(), logLevel.end(), logLevel.begin(), - ::tolower); - - if (logLevel == "trace") { - setLogLevel(trace); - } else if (logLevel == "debug") { - setLogLevel(debug); - } else if (logLevel == "info") { - setLogLevel(info); - } else if (logLevel == "warn") { - setLogLevel(warn); - } else if (logLevel == "error") { - setLogLevel(err); - } else if (logLevel == "critical") { - setLogLevel(critical); - } else if (logLevel == "off") { - setLogLevel(off); - } else { - setLogLevel(defaultLevel); - } -} - -void BaseLogger::set_error_logger(std::shared_ptr<spdlog::logger> other) { - stderr_ = std::move(other); -} - http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/Configure.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp index 6f5c08d..96ed7c7 100644 --- a/libminifi/src/Configure.cpp +++ b/libminifi/src/Configure.cpp @@ -1,6 +1,4 @@ /** - * @file Configure.cpp - * Configure class implementation * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -17,26 +15,33 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "Configure.h" +#include "properties/Configure.h" #include "utils/StringUtils.h" +#include "core/core.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { Configure *Configure::configure_(NULL); const char *Configure::nifi_flow_configuration_file = "nifi.flow.configuration.file"; const char *Configure::nifi_administrative_yield_duration = "nifi.administrative.yield.duration"; const char *Configure::nifi_bored_yield_duration = "nifi.bored.yield.duration"; -const char *Configure::nifi_graceful_shutdown_seconds = "nifi.graceful.shutdown.seconds"; +const char *Configure::nifi_graceful_shutdown_seconds = "nifi.flowcontroller.graceful.shutdown.period"; const char *Configure::nifi_log_level = "nifi.log.level"; const char *Configure::nifi_server_name = "nifi.server.name"; +const char *Configure::nifi_configuration_class_name = "nifi.flow.configuration.class.name"; +const char *Configure::nifi_flow_repository_class_name = "nifi.flow.repository.class.name"; +const char *Configure::nifi_provenance_repository_class_name = "nifi.provenance.repository.class.name"; const char *Configure::nifi_server_port = "nifi.server.port"; const char *Configure::nifi_server_report_interval= "nifi.server.report.interval"; const char *Configure::nifi_provenance_repository_max_storage_size = "nifi.provenance.repository.max.storage.size"; const char *Configure::nifi_provenance_repository_max_storage_time = "nifi.provenance.repository.max.storage.time"; const char *Configure::nifi_provenance_repository_directory_default = "nifi.provenance.repository.directory.default"; -const char *Configure::nifi_provenance_repository_enable = "nifi.provenance.repository.enable"; const char *Configure::nifi_flowfile_repository_max_storage_size = "nifi.flowfile.repository.max.storage.size"; const char *Configure::nifi_flowfile_repository_max_storage_time = "nifi.flowfile.repository.max.storage.time"; const char *Configure::nifi_flowfile_repository_directory_default = "nifi.flowfile.repository.directory.default"; -const char *Configure::nifi_flowfile_repository_enable = "nifi.flowfile.repository.enable"; const char *Configure::nifi_remote_input_secure = "nifi.remote.input.secure"; const char *Configure::nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth"; const char *Configure::nifi_security_client_certificate = "nifi.security.client.certificate"; @@ -44,13 +49,13 @@ const char *Configure::nifi_security_client_private_key = "nifi.security.client. const char *Configure::nifi_security_client_pass_phrase = "nifi.security.client.pass.phrase"; const char *Configure::nifi_security_client_ca_certificate = "nifi.security.client.ca.certificate"; -//! Get the config value +// Get the config value bool Configure::get(std::string key, std::string &value) { - std::lock_guard<std::mutex> lock(_mtx); - auto it = _properties.find(key); + std::lock_guard<std::mutex> lock(mutex_); + auto it = properties_.find(key); - if (it != _properties.end()) + if (it != properties_.end()) { value = it->second; return true; @@ -62,7 +67,7 @@ bool Configure::get(std::string key, std::string &value) } -//! Parse one line in configure file like key=value +// Parse one line in configure file like key=value void Configure::parseConfigureFileLine(char *buf) { char *line = buf; @@ -96,12 +101,12 @@ void Configure::parseConfigureFileLine(char *buf) } std::string value = equal; - key = StringUtils::trimRight(key); - value = StringUtils::trimRight(value); + key = org::apache::nifi::minifi::utils::StringUtils::trimRight(key); + value = org::apache::nifi::minifi::utils::StringUtils::trimRight(value); set(key, value); } -//! Load Configure File +// Load Configure File void Configure::loadConfigureFile(const char *fileName) { @@ -138,7 +143,7 @@ void Configure::loadConfigureFile(const char *fileName) } } -//! Parse Command Line +// Parse Command Line void Configure::parseCommandLine(int argc, char **argv) { int i; @@ -162,3 +167,8 @@ void Configure::parseCommandLine(int argc, char **argv) } return; } + +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/Connection.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp index 42dbfe4..6f64ff3 100644 --- a/libminifi/src/Connection.cpp +++ b/libminifi/src/Connection.cpp @@ -27,185 +27,161 @@ #include <thread> #include <iostream> +#include "core/FlowFile.h" #include "Connection.h" -#include "Processor.h" -#include "FlowFileRepository.h" -#include "FlowController.h" - -Connection::Connection(std::string name, uuid_t uuid, uuid_t srcUUID, uuid_t destUUID) -: _name(name) -{ - if (!uuid) - // Generate the global UUID for the flow record - uuid_generate(_uuid); - else - uuid_copy(_uuid, uuid); - - if (srcUUID) - uuid_copy(_srcUUID, srcUUID); - if (destUUID) - uuid_copy(_destUUID, destUUID); - - _srcProcessor = NULL; - _destProcessor = NULL; - _maxQueueSize = 0; - _maxQueueDataSize = 0; - _expiredDuration = 0; - _queuedDataSize = 0; - - logger_ = Logger::getLogger(); - - char uuidStr[37]; - uuid_unparse_lower(_uuid, uuidStr); - _uuidStr = uuidStr; - - logger_->log_info("Connection %s created", _name.c_str()); +#include "core/Processor.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { + +Connection::Connection(std::shared_ptr<core::Repository> flow_repository, + std::string name, uuid_t uuid, uuid_t srcUUID, + uuid_t destUUID) + : core::Connectable(name, uuid), + flow_repository_(flow_repository) { + + if (srcUUID) + uuid_copy(src_uuid_, srcUUID); + if (destUUID) + uuid_copy(dest_uuid_, destUUID); + + source_connectable_ = nullptr; + dest_connectable_ = nullptr; + max_queue_size_ = 0; + max_data_queue_size_ = 0; + expired_duration_ = 0; + queued_data_size_ = 0; + + logger_ = logging::Logger::getLogger(); + + logger_->log_info("Connection %s created", name_.c_str()); } -bool Connection::isEmpty() -{ - std::lock_guard<std::mutex> lock(_mtx); +bool Connection::isEmpty() { + std::lock_guard<std::mutex> lock(mutex_); - return _queue.empty(); + return queue_.empty(); } -bool Connection::isFull() -{ - std::lock_guard<std::mutex> lock(_mtx); +bool Connection::isFull() { + std::lock_guard<std::mutex> lock(mutex_); - if (_maxQueueSize <= 0 && _maxQueueDataSize <= 0) - // No back pressure setting - return false; + if (max_queue_size_ <= 0 && max_data_queue_size_ <= 0) + // No back pressure setting + return false; - if (_maxQueueSize > 0 && _queue.size() >= _maxQueueSize) - return true; + if (max_queue_size_ > 0 && queue_.size() >= max_queue_size_) + return true; - if (_maxQueueDataSize > 0 && _queuedDataSize >= _maxQueueDataSize) - return true; + if (max_data_queue_size_ > 0 && queued_data_size_ >= max_data_queue_size_) + return true; - return false; + return false; } -void Connection::put(FlowFileRecord *flow) -{ - { - std::lock_guard<std::mutex> lock(_mtx); - - _queue.push(flow); - - _queuedDataSize += flow->getSize(); - - logger_->log_debug("Enqueue flow file UUID %s to connection %s", - flow->getUUIDStr().c_str(), _name.c_str()); - } - - - if (FlowControllerFactory::getFlowController()->getFlowFileRepository() && - FlowControllerFactory::getFlowController()->getFlowFileRepository()->isEnable() && - !flow->isStoredToRepository()) - { - // Save to the flowfile repo - FlowFileEventRecord event; - event.fromFlowFile(flow, this->_uuidStr); - if (event.Serialize( - FlowControllerFactory::getFlowController()->getFlowFileRepository())) - { - flow->setStoredToRepository(true); - } - } - - // Notify receiving processor that work may be available - if(_destProcessor) - { - _destProcessor->notifyWork(); - } +void Connection::put(std::shared_ptr<core::FlowFile> flow) { + { + std::lock_guard<std::mutex> lock(mutex_); + + queue_.push(flow); + + queued_data_size_ += flow->getSize(); + + logger_->log_debug("Enqueue flow file UUID %s to connection %s", + flow->getUUIDStr().c_str(), name_.c_str()); + } + + if (!flow->isStored()) { + // Save to the flowfile repo + FlowFileRecord event(flow_repository_,flow,this->uuidStr_); + if (event.Serialize()) { + flow->setStoredToRepository(true); + } + } + + // Notify receiving processor that work may be available + if (dest_connectable_) { + dest_connectable_->notifyWork(); + } } -FlowFileRecord* Connection::poll(std::set<FlowFileRecord *> &expiredFlowRecords) -{ - std::lock_guard<std::mutex> lock(_mtx); - - while (!_queue.empty()) - { - FlowFileRecord *item = _queue.front(); - _queue.pop(); - _queuedDataSize -= item->getSize(); - - if (_expiredDuration > 0) - { - // We need to check for flow expiration - if (getTimeMillis() > (item->getEntryDate() + _expiredDuration)) - { - // Flow record expired - expiredFlowRecords.insert(item); - if (FlowControllerFactory::getFlowController()->getFlowFileRepository() && - FlowControllerFactory::getFlowController()->getFlowFileRepository()->isEnable()) - { - // delete from the flowfile repo - FlowControllerFactory::getFlowController()->getFlowFileRepository()->Delete(item->getUUIDStr()); - item->setStoredToRepository(false); - } - } - else - { - // Flow record not expired - if (item->isPenalized()) - { - // Flow record was penalized - _queue.push(item); - _queuedDataSize += item->getSize(); - break; - } - item->setOriginalConnection(this); - logger_->log_debug("Dequeue flow file UUID %s from connection %s", - item->getUUIDStr().c_str(), _name.c_str()); - if (FlowControllerFactory::getFlowController()->getFlowFileRepository() && - FlowControllerFactory::getFlowController()->getFlowFileRepository()->isEnable()) - { - // delete from the flowfile repo - FlowControllerFactory::getFlowController()->getFlowFileRepository()->Delete(item->getUUIDStr()); - item->setStoredToRepository(false); - } - return item; - } - } - else - { - // Flow record not expired - if (item->isPenalized()) - { - // Flow record was penalized - _queue.push(item); - _queuedDataSize += item->getSize(); - break; - } - item->setOriginalConnection(this); - logger_->log_debug("Dequeue flow file UUID %s from connection %s", - item->getUUIDStr().c_str(), _name.c_str()); - if (FlowControllerFactory::getFlowController()->getFlowFileRepository() && - FlowControllerFactory::getFlowController()->getFlowFileRepository()->isEnable()) - { - // delete from the flowfile repo - FlowControllerFactory::getFlowController()->getFlowFileRepository()->Delete(item->getUUIDStr()); - item->setStoredToRepository(false); - } - return item; - } - } - - return NULL; +std::shared_ptr<core::FlowFile> Connection::poll( + std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords) { + std::lock_guard<std::mutex> lock(mutex_); + + while (!queue_.empty()) { + std::shared_ptr<core::FlowFile> item = queue_.front(); + queue_.pop(); + queued_data_size_ -= item->getSize(); + + if (expired_duration_ > 0) { + // We need to check for flow expiration + if (getTimeMillis() > (item->getEntryDate() + expired_duration_)) { + // Flow record expired + expiredFlowRecords.insert(item); + if (flow_repository_->Delete(item->getUUIDStr())) { + item->setStoredToRepository(false); + } + } else { + // Flow record not expired + if (item->isPenalized()) { + // Flow record was penalized + queue_.push(item); + queued_data_size_ += item->getSize(); + break; + } + std::shared_ptr<Connectable> connectable = std::static_pointer_cast< + Connectable>(shared_from_this()); + item->setOriginalConnection(connectable); + logger_->log_debug("Dequeue flow file UUID %s from connection %s", + item->getUUIDStr().c_str(), name_.c_str()); + + // delete from the flowfile repo + if (flow_repository_->Delete(item->getUUIDStr())) { + item->setStoredToRepository(false); + } + + return item; + } + } else { + // Flow record not expired + if (item->isPenalized()) { + // Flow record was penalized + queue_.push(item); + queued_data_size_ += item->getSize(); + break; + } + std::shared_ptr<Connectable> connectable = std::static_pointer_cast< + Connectable>(shared_from_this()); + item->setOriginalConnection(connectable); + logger_->log_debug("Dequeue flow file UUID %s from connection %s", + item->getUUIDStr().c_str(), name_.c_str()); + // delete from the flowfile repo + if (flow_repository_->Delete(item->getUUIDStr())) { + item->setStoredToRepository(false); + } + + return item; + } + } + + return NULL; } -void Connection::drain() -{ - std::lock_guard<std::mutex> lock(_mtx); +void Connection::drain() { + std::lock_guard<std::mutex> lock(mutex_); - while (!_queue.empty()) - { - FlowFileRecord *item = _queue.front(); - _queue.pop(); - delete item; - } + while (!queue_.empty()) { + auto &&item = queue_.front(); + queue_.pop(); + } - logger_->log_debug("Drain connection %s", _name.c_str()); + logger_->log_debug("Drain connection %s", name_.c_str()); } + +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/EventDrivenSchedulingAgent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp b/libminifi/src/EventDrivenSchedulingAgent.cpp index 53bde4e..0484139 100644 --- a/libminifi/src/EventDrivenSchedulingAgent.cpp +++ b/libminifi/src/EventDrivenSchedulingAgent.cpp @@ -20,28 +20,43 @@ #include <chrono> #include <thread> #include <iostream> -#include "Property.h" #include "EventDrivenSchedulingAgent.h" +#include "core/Processor.h" +#include "core/ProcessContext.h" +#include "core/ProcessSessionFactory.h" +#include "core/Property.h" -void EventDrivenSchedulingAgent::run(Processor *processor, ProcessContext *processContext, ProcessSessionFactory *sessionFactory) -{ - while (this->_running) - { - bool shouldYield = this->onTrigger(processor, processContext, sessionFactory); +namespace org { +namespace apache { +namespace nifi { +namespace minifi { - if (processor->isYield()) - { - // Honor the yield - std::this_thread::sleep_for(std::chrono::milliseconds(processor->getYieldTime())); - } - else if (shouldYield && this->_boredYieldDuration > 0) - { - // No work to do or need to apply back pressure - std::this_thread::sleep_for(std::chrono::milliseconds(this->_boredYieldDuration)); - } - // Block until work is available - processor->waitForWork(1000); - } - return; +void EventDrivenSchedulingAgent::run( + std::shared_ptr<core::Processor> processor, + core::ProcessContext *processContext, + core::ProcessSessionFactory *sessionFactory) { + while (this->running_) { + bool shouldYield = this->onTrigger(processor, processContext, + sessionFactory); + + if (processor->isYield()) { + // Honor the yield + std::this_thread::sleep_for( + std::chrono::milliseconds(processor->getYieldTime())); + } else if (shouldYield && this->_boredYieldDuration > 0) { + // No work to do or need to apply back pressure + std::this_thread::sleep_for( + std::chrono::milliseconds(this->_boredYieldDuration)); + } + + // Block until work is available + processor->waitForWork(1000); + } + return; } + +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/ExecuteProcess.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/ExecuteProcess.cpp b/libminifi/src/ExecuteProcess.cpp deleted file mode 100644 index 61f96d5..0000000 --- a/libminifi/src/ExecuteProcess.cpp +++ /dev/null @@ -1,251 +0,0 @@ -/** - * @file ExecuteProcess.cpp - * ExecuteProcess class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "ExecuteProcess.h" -#include "ProcessContext.h" -#include "ProcessSession.h" -#include <cstring> -#include "utils/StringUtils.h" -#include "utils/TimeUtil.h" - -const std::string ExecuteProcess::ProcessorName("ExecuteProcess"); -Property ExecuteProcess::Command("Command", "Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.", ""); -Property ExecuteProcess::CommandArguments("Command Arguments", - "The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.", ""); -Property ExecuteProcess::WorkingDir("Working Directory", - "The directory to use as the current working directory when executing the command", ""); -Property ExecuteProcess::BatchDuration("Batch Duration", - "If the process is expected to be long-running and produce textual output, a batch duration can be specified.", "0"); -Property ExecuteProcess::RedirectErrorStream("Redirect Error Stream", - "If true will redirect any error stream output of the process to the output stream.", "false"); -Relationship ExecuteProcess::Success("success", "All created FlowFiles are routed to this relationship."); - -void ExecuteProcess::initialize() -{ - //! Set the supported properties - std::set<Property> properties; - properties.insert(Command); - properties.insert(CommandArguments); - properties.insert(WorkingDir); - properties.insert(BatchDuration); - properties.insert(RedirectErrorStream); - setSupportedProperties(properties); - //! Set the supported relationships - std::set<Relationship> relationships; - relationships.insert(Success); - setSupportedRelationships(relationships); -} - - -void ExecuteProcess::onTrigger(ProcessContext *context, ProcessSession *session) -{ - std::string value; - if (context->getProperty(Command.getName(), value)) - { - this->_command = value; - } - if (context->getProperty(CommandArguments.getName(), value)) - { - this->_commandArgument = value; - } - if (context->getProperty(WorkingDir.getName(), value)) - { - this->_workingDir = value; - } - if (context->getProperty(BatchDuration.getName(), value)) - { - TimeUnit unit; - if (Property::StringToTime(value, _batchDuration, unit) && - Property::ConvertTimeUnitToMS(_batchDuration, unit, _batchDuration)) - { - - } - } - if (context->getProperty(RedirectErrorStream.getName(), value)) - { - StringUtils::StringToBool(value, _redirectErrorStream); - } - this->_fullCommand = _command + " " + _commandArgument; - if (_fullCommand.length() == 0) - { - yield(); - return; - } - if (_workingDir.length() > 0 && _workingDir != ".") - { - // change to working directory - if (chdir(_workingDir.c_str()) != 0) - { - logger_->log_error("Execute Command can not chdir %s", _workingDir.c_str()); - yield(); - return; - } - } - logger_->log_info("Execute Command %s", _fullCommand.c_str()); - // split the command into array - char cstr[_fullCommand.length()+1]; - std::strcpy(cstr, _fullCommand.c_str()); - char *p = std::strtok (cstr, " "); - int argc = 0; - char *argv[64]; - while (p != 0 && argc < 64) - { - argv[argc] = p; - p = std::strtok(NULL, " "); - argc++; - } - argv[argc] = NULL; - int status, died; - if (!_processRunning) - { - _processRunning = true; - // if the process has not launched yet - // create the pipe - if (pipe(_pipefd) == -1) - { - _processRunning = false; - yield(); - return; - } - switch (_pid = fork()) - { - case -1: - logger_->log_error("Execute Process fork failed"); - _processRunning = false; - close(_pipefd[0]); - close(_pipefd[1]); - yield(); - break; - case 0 : // this is the code the child runs - close(1); // close stdout - dup(_pipefd[1]); // points pipefd at file descriptor - if (_redirectErrorStream) - // redirect stderr - dup2(_pipefd[1], 2); - close(_pipefd[0]); - execvp(argv[0], argv); - exit(1); - break; - default: // this is the code the parent runs - // the parent isn't going to write to the pipe - close(_pipefd[1]); - if (_batchDuration > 0) - { - while (1) - { - std::this_thread::sleep_for(std::chrono::milliseconds(_batchDuration)); - char buffer[4096]; - int numRead = read(_pipefd[0], buffer, sizeof(buffer)); - if (numRead <= 0) - break; - logger_->log_info("Execute Command Respond %d", numRead); - ExecuteProcess::WriteCallback callback(buffer, numRead); - FlowFileRecord *flowFile = session->create(); - if (!flowFile) - continue; - flowFile->addAttribute("command", _command.c_str()); - flowFile->addAttribute("command.arguments", _commandArgument.c_str()); - session->write(flowFile, &callback); - session->transfer(flowFile, Success); - session->commit(); - } - } - else - { - char buffer[4096]; - char *bufPtr = buffer; - int totalRead = 0; - FlowFileRecord *flowFile = NULL; - while (1) - { - int numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) - totalRead)); - if (numRead <= 0) - { - if (totalRead > 0) - { - logger_->log_info("Execute Command Respond %d", totalRead); - // child exits and close the pipe - ExecuteProcess::WriteCallback callback(buffer, totalRead); - if (!flowFile) - { - flowFile = session->create(); - if (!flowFile) - break; - flowFile->addAttribute("command", _command.c_str()); - flowFile->addAttribute("command.arguments", _commandArgument.c_str()); - session->write(flowFile, &callback); - } - else - { - session->append(flowFile, &callback); - } - session->transfer(flowFile, Success); - } - break; - } - else - { - if (numRead == (sizeof(buffer) - totalRead)) - { - // we reach the max buffer size - logger_->log_info("Execute Command Max Respond %d", sizeof(buffer)); - ExecuteProcess::WriteCallback callback(buffer, sizeof(buffer)); - if (!flowFile) - { - flowFile = session->create(); - if (!flowFile) - continue; - flowFile->addAttribute("command", _command.c_str()); - flowFile->addAttribute("command.arguments", _commandArgument.c_str()); - session->write(flowFile, &callback); - } - else - { - session->append(flowFile, &callback); - } - // Rewind - totalRead = 0; - bufPtr = buffer; - } - else - { - totalRead += numRead; - bufPtr += numRead; - } - } - } - } - - died= wait(&status); - if (WIFEXITED(status)) - { - logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand.c_str(), WEXITSTATUS(status), _pid); - } - else - { - logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand.c_str(), WTERMSIG(status), _pid); - } - - close(_pipefd[0]); - _processRunning = false; - break; - } - } -} - http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/FlowControlProtocol.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowControlProtocol.cpp b/libminifi/src/FlowControlProtocol.cpp index 22ef1f9..50fc0e2 100644 --- a/libminifi/src/FlowControlProtocol.cpp +++ b/libminifi/src/FlowControlProtocol.cpp @@ -27,493 +27,471 @@ #include <iostream> #include "FlowController.h" #include "FlowControlProtocol.h" - -int FlowControlProtocol::connectServer(const char *host, uint16_t port) -{ - in_addr_t addr; - int sock = 0; - struct hostent *h; +#include "core/core.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { + +int FlowControlProtocol::connectServer(const char *host, uint16_t port) { + in_addr_t addr; + int sock = 0; + struct hostent *h; #ifdef __MACH__ - h = gethostbyname(host); + h = gethostbyname(host); #else - char buf[1024]; - struct hostent he; - int hh_errno; - gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno); + char buf[1024]; + struct hostent he; + int hh_errno; + gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno); #endif - memcpy((char *) &addr, h->h_addr_list[0], h->h_length); - sock = socket(AF_INET, SOCK_STREAM, 0); - if (sock < 0) - { - logger_->log_error("Could not create socket to hostName %s", host); - return 0; - } + memcpy((char *) &addr, h->h_addr_list[0], h->h_length); + sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock < 0) { + logger_->log_error("Could not create socket to hostName %s", host); + return 0; + } #ifndef __MACH__ - int opt = 1; - bool nagle_off = true; - - if (nagle_off) - { - if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, sizeof(opt)) < 0) - { - logger_->log_error("setsockopt() TCP_NODELAY failed"); - close(sock); - return 0; - } - if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, - (char *)&opt, sizeof(opt)) < 0) - { - logger_->log_error("setsockopt() SO_REUSEADDR failed"); - close(sock); - return 0; - } - } - - int sndsize = 256*1024; - if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, (int)sizeof(sndsize)) < 0) - { - logger_->log_error("setsockopt() SO_SNDBUF failed"); - close(sock); - return 0; - } + int opt = 1; + bool nagle_off = true; + + if (nagle_off) + { + if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, sizeof(opt)) < 0) + { + logger_->log_error("setsockopt() TCP_NODELAY failed"); + close(sock); + return 0; + } + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, + (char *)&opt, sizeof(opt)) < 0) + { + logger_->log_error("setsockopt() SO_REUSEADDR failed"); + close(sock); + return 0; + } + } + + int sndsize = 256*1024; + if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, (int)sizeof(sndsize)) < 0) + { + logger_->log_error("setsockopt() SO_SNDBUF failed"); + close(sock); + return 0; + } #endif - struct sockaddr_in sa; - socklen_t socklen; - int status; - - memset(&sa, 0, sizeof(sa)); - sa.sin_family = AF_INET; - sa.sin_addr.s_addr = htonl(INADDR_ANY); - sa.sin_port = htons(0); - socklen = sizeof(sa); - if (bind(sock, (struct sockaddr *)&sa, socklen) < 0) - { - logger_->log_error("socket bind failed"); - close(sock); - return 0; - } - - memset(&sa, 0, sizeof(sa)); - sa.sin_family = AF_INET; - sa.sin_addr.s_addr = addr; - sa.sin_port = htons(port); - socklen = sizeof(sa); - - status = connect(sock, (struct sockaddr *)&sa, socklen); - - if (status < 0) - { - logger_->log_error("socket connect failed to %s %d", host, port); - close(sock); - return 0; - } - - logger_->log_info("Flow Control Protocol socket %d connect to server %s port %d success", sock, host, port); - - return sock; + struct sockaddr_in sa; + socklen_t socklen; + int status; + + memset(&sa, 0, sizeof(sa)); + sa.sin_family = AF_INET; + sa.sin_addr.s_addr = htonl(INADDR_ANY); + sa.sin_port = htons(0); + socklen = sizeof(sa); + if (bind(sock, (struct sockaddr *) &sa, socklen) < 0) { + logger_->log_error("socket bind failed"); + close(sock); + return 0; + } + + memset(&sa, 0, sizeof(sa)); + sa.sin_family = AF_INET; + sa.sin_addr.s_addr = addr; + sa.sin_port = htons(port); + socklen = sizeof(sa); + + status = connect(sock, (struct sockaddr *) &sa, socklen); + + if (status < 0) { + logger_->log_error("socket connect failed to %s %d", host, port); + close(sock); + return 0; + } + + logger_->log_info( + "Flow Control Protocol socket %d connect to server %s port %d success", + sock, host, port); + + return sock; } -int FlowControlProtocol::sendData(uint8_t *buf, int buflen) -{ - int ret = 0, bytes = 0; - - while (bytes < buflen) - { - ret = send(_socket, buf+bytes, buflen-bytes, 0); - //check for errors - if (ret == -1) - { - return ret; - } - bytes+=ret; - } - - return bytes; +int FlowControlProtocol::sendData(uint8_t *buf, int buflen) { + int ret = 0, bytes = 0; + + while (bytes < buflen) { + ret = send(_socket, buf + bytes, buflen - bytes, 0); + //check for errors + if (ret == -1) { + return ret; + } + bytes += ret; + } + + return bytes; } -int FlowControlProtocol::selectClient(int msec) -{ - fd_set fds; - struct timeval tv; - int retval; - int fd = _socket; - - FD_ZERO(&fds); - FD_SET(fd, &fds); - - tv.tv_sec = msec/1000; - tv.tv_usec = (msec % 1000) * 1000; - - if (msec > 0) - retval = select(fd+1, &fds, NULL, NULL, &tv); - else - retval = select(fd+1, &fds, NULL, NULL, NULL); - - if (retval <= 0) - return retval; - if (FD_ISSET(fd, &fds)) - return retval; - else - return 0; +int FlowControlProtocol::selectClient(int msec) { + fd_set fds; + struct timeval tv; + int retval; + int fd = _socket; + + FD_ZERO(&fds); + FD_SET(fd, &fds); + + tv.tv_sec = msec / 1000; + tv.tv_usec = (msec % 1000) * 1000; + + if (msec > 0) + retval = select(fd + 1, &fds, NULL, NULL, &tv); + else + retval = select(fd + 1, &fds, NULL, NULL, NULL); + + if (retval <= 0) + return retval; + if (FD_ISSET(fd, &fds)) + return retval; + else + return 0; } -int FlowControlProtocol::readData(uint8_t *buf, int buflen) -{ - int sendSize = buflen; - - while (buflen) - { - int status; - status = selectClient(MAX_READ_TIMEOUT); - if (status <= 0) - { - return status; - } +int FlowControlProtocol::readData(uint8_t *buf, int buflen) { + int sendSize = buflen; + + while (buflen) { + int status; + status = selectClient(MAX_READ_TIMEOUT); + if (status <= 0) { + return status; + } #ifndef __MACH__ - status = read(_socket, buf, buflen); + status = read(_socket, buf, buflen); #else - status = recv(_socket, buf, buflen, 0); + status = recv(_socket, buf, buflen, 0); #endif - if (status <= 0) - { - return status; - } - buflen -= status; - buf += status; - } - - return sendSize; + if (status <= 0) { + return status; + } + buflen -= status; + buf += status; + } + + return sendSize; } -int FlowControlProtocol::readHdr(FlowControlProtocolHeader *hdr) -{ - uint8_t buffer[sizeof(FlowControlProtocolHeader)]; +int FlowControlProtocol::readHdr(FlowControlProtocolHeader *hdr) { + uint8_t buffer[sizeof(FlowControlProtocolHeader)]; - uint8_t *data = buffer; + uint8_t *data = buffer; - int status = readData(buffer, sizeof(FlowControlProtocolHeader)); - if (status <= 0) - return status; + int status = readData(buffer, sizeof(FlowControlProtocolHeader)); + if (status <= 0) + return status; - uint32_t value; - data = this->decode(data, value); - hdr->msgType = value; + uint32_t value; + data = this->decode(data, value); + hdr->msgType = value; - data = this->decode(data, value); - hdr->seqNumber = value; + data = this->decode(data, value); + hdr->seqNumber = value; - data = this->decode(data, value); - hdr->status = value; + data = this->decode(data, value); + hdr->status = value; - data = this->decode(data, value); - hdr->payloadLen = value; + data = this->decode(data, value); + hdr->payloadLen = value; - return sizeof(FlowControlProtocolHeader); + return sizeof(FlowControlProtocolHeader); } -void FlowControlProtocol::start() -{ - if (_reportInterval <= 0) - return; - if (_running) - return; - _running = true; - logger_->log_info("FlowControl Protocol Start"); - _thread = new std::thread(run, this); - _thread->detach(); +void FlowControlProtocol::start() { + if (_reportInterval <= 0) + return; + if (running_) + return; + running_ = true; + logger_->log_info("FlowControl Protocol Start"); + _thread = new std::thread(run, this); + _thread->detach(); } -void FlowControlProtocol::stop() -{ - if (!_running) - return; - _running = false; - logger_->log_info("FlowControl Protocol Stop"); +void FlowControlProtocol::stop() { + if (!running_) + return; + running_ = false; + logger_->log_info("FlowControl Protocol Stop"); } -void FlowControlProtocol::run(FlowControlProtocol *protocol) -{ - while (protocol->_running) - { - std::this_thread::sleep_for(std::chrono::milliseconds(protocol->_reportInterval)); - if (!protocol->_registered) - { - // if it is not register yet - protocol->sendRegisterReq(); - } - else - protocol->sendReportReq(); - } - return; +void FlowControlProtocol::run(FlowControlProtocol *protocol) { + while (protocol->running_) { + std::this_thread::sleep_for( + std::chrono::milliseconds(protocol->_reportInterval)); + if (!protocol->_registered) { + // if it is not register yet + protocol->sendRegisterReq(); + } else + protocol->sendReportReq(); + } + return; } -int FlowControlProtocol::sendRegisterReq() -{ - if (_registered) - { - logger_->log_info("Already registered"); - return -1; - } - - uint16_t port = this->_serverPort; - - if (this->_socket <= 0) - this->_socket = connectServer(_serverName.c_str(), port); - - if (this->_socket <= 0) - return -1; - - // Calculate the total payload msg size - uint32_t payloadSize = FlowControlMsgIDEncodingLen(FLOW_SERIAL_NUMBER, 0) + - FlowControlMsgIDEncodingLen(FLOW_YML_NAME, this->_controller->getName().size()+1); - uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize; - - uint8_t *data = new uint8_t[size]; - uint8_t *start = data; - - // encode the HDR - FlowControlProtocolHeader hdr; - hdr.msgType = REGISTER_REQ; - hdr.payloadLen = payloadSize; - hdr.seqNumber = this->_seqNumber; - hdr.status = RESP_SUCCESS; - data = this->encode(data, hdr.msgType); - data = this->encode(data, hdr.seqNumber); - data = this->encode(data, hdr.status); - data = this->encode(data, hdr.payloadLen); - - // encode the serial number - data = this->encode(data, FLOW_SERIAL_NUMBER); - data = this->encode(data, this->_serialNumber, 8); - - // encode the YAML name - data = this->encode(data, FLOW_YML_NAME); - data = this->encode(data, this->_controller->getName()); - - // send it - int status = sendData(start, size); - delete[] start; - if (status <= 0) - { - close(_socket); - _socket = 0; - logger_->log_error("Flow Control Protocol Send Register Req failed"); - return -1; - } - - // Looking for register respond - status = readHdr(&hdr); - - if (status <= 0) - { - close(_socket); - _socket = 0; - logger_->log_error("Flow Control Protocol Read Register Resp header failed"); - return -1; - } - logger_->log_info("Flow Control Protocol receive MsgType %s", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType)); - logger_->log_info("Flow Control Protocol receive Seq Num %d", hdr.seqNumber); - logger_->log_info("Flow Control Protocol receive Resp Code %s", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status)); - logger_->log_info("Flow Control Protocol receive Payload len %d", hdr.payloadLen); - - if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber) - { - this->_registered = true; - this->_seqNumber++; - logger_->log_info("Flow Control Protocol Register success"); - uint8_t *payload = new uint8_t[hdr.payloadLen]; - uint8_t *payloadPtr = payload; - status = readData(payload, hdr.payloadLen); - if (status <= 0) - { - delete[] payload; - logger_->log_info("Flow Control Protocol Register Read Payload fail"); - close(_socket); - _socket = 0; - return -1; - } - while (payloadPtr < (payload + hdr.payloadLen)) - { - uint32_t msgID; - payloadPtr = this->decode(payloadPtr, msgID); - if (((FlowControlMsgID) msgID) == REPORT_INTERVAL) - { - // Fixed 4 bytes - uint32_t reportInterval; - payloadPtr = this->decode(payloadPtr, reportInterval); - logger_->log_info("Flow Control Protocol receive report interval %d ms", reportInterval); - this->_reportInterval = reportInterval; - } - else - { - break; - } - } - delete[] payload; - close(_socket); - _socket = 0; - return 0; - } - else - { - logger_->log_info("Flow Control Protocol Register fail"); - close(_socket); - _socket = 0; - return -1; - } +int FlowControlProtocol::sendRegisterReq() { + if (_registered) { + logger_->log_info("Already registered"); + return -1; + } + + uint16_t port = this->_serverPort; + + if (this->_socket <= 0) + this->_socket = connectServer(_serverName.c_str(), port); + + if (this->_socket <= 0) + return -1; + + // Calculate the total payload msg size + uint32_t payloadSize = FlowControlMsgIDEncodingLen(FLOW_SERIAL_NUMBER, 0) + + FlowControlMsgIDEncodingLen(FLOW_YML_NAME, + this->_controller->getName().size() + 1); + uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize; + + uint8_t *data = new uint8_t[size]; + uint8_t *start = data; + + // encode the HDR + FlowControlProtocolHeader hdr; + hdr.msgType = REGISTER_REQ; + hdr.payloadLen = payloadSize; + hdr.seqNumber = this->_seqNumber; + hdr.status = RESP_SUCCESS; + data = this->encode(data, hdr.msgType); + data = this->encode(data, hdr.seqNumber); + data = this->encode(data, hdr.status); + data = this->encode(data, hdr.payloadLen); + + // encode the serial number + data = this->encode(data, FLOW_SERIAL_NUMBER); + data = this->encode(data, this->_serialNumber, 8); + + // encode the YAML name + data = this->encode(data, FLOW_YML_NAME); + data = this->encode(data, this->_controller->getName()); + + // send it + int status = sendData(start, size); + delete[] start; + if (status <= 0) { + close(_socket); + _socket = 0; + logger_->log_error("Flow Control Protocol Send Register Req failed"); + return -1; + } + + // Looking for register respond + status = readHdr(&hdr); + + if (status <= 0) { + close(_socket); + _socket = 0; + logger_->log_error( + "Flow Control Protocol Read Register Resp header failed"); + return -1; + } + logger_->log_info("Flow Control Protocol receive MsgType %s", + FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType)); + logger_->log_info("Flow Control Protocol receive Seq Num %d", hdr.seqNumber); + logger_->log_info("Flow Control Protocol receive Resp Code %s", + FlowControlRespCodeToStr((FlowControlRespCode) hdr.status)); + logger_->log_info("Flow Control Protocol receive Payload len %d", + hdr.payloadLen); + + if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber) { + this->_registered = true; + this->_seqNumber++; + logger_->log_info("Flow Control Protocol Register success"); + uint8_t *payload = new uint8_t[hdr.payloadLen]; + uint8_t *payloadPtr = payload; + status = readData(payload, hdr.payloadLen); + if (status <= 0) { + delete[] payload; + logger_->log_info("Flow Control Protocol Register Read Payload fail"); + close(_socket); + _socket = 0; + return -1; + } + while (payloadPtr < (payload + hdr.payloadLen)) { + uint32_t msgID; + payloadPtr = this->decode(payloadPtr, msgID); + if (((FlowControlMsgID) msgID) == REPORT_INTERVAL) { + // Fixed 4 bytes + uint32_t reportInterval; + payloadPtr = this->decode(payloadPtr, reportInterval); + logger_->log_info("Flow Control Protocol receive report interval %d ms", + reportInterval); + this->_reportInterval = reportInterval; + } else { + break; + } + } + delete[] payload; + close(_socket); + _socket = 0; + return 0; + } else { + logger_->log_info("Flow Control Protocol Register fail"); + close(_socket); + _socket = 0; + return -1; + } } - -int FlowControlProtocol::sendReportReq() -{ - uint16_t port = this->_serverPort; - - if (this->_socket <= 0) - this->_socket = connectServer(_serverName.c_str(), port); - - if (this->_socket <= 0) - return -1; - - // Calculate the total payload msg size - uint32_t payloadSize = - FlowControlMsgIDEncodingLen(FLOW_YML_NAME, this->_controller->getName().size()+1); - uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize; - - uint8_t *data = new uint8_t[size]; - uint8_t *start = data; - - // encode the HDR - FlowControlProtocolHeader hdr; - hdr.msgType = REPORT_REQ; - hdr.payloadLen = payloadSize; - hdr.seqNumber = this->_seqNumber; - hdr.status = RESP_SUCCESS; - data = this->encode(data, hdr.msgType); - data = this->encode(data, hdr.seqNumber); - data = this->encode(data, hdr.status); - data = this->encode(data, hdr.payloadLen); - - // encode the YAML name - data = this->encode(data, FLOW_YML_NAME); - data = this->encode(data, this->_controller->getName()); - - // send it - int status = sendData(start, size); - delete[] start; - if (status <= 0) - { - close(_socket); - _socket = 0; - logger_->log_error("Flow Control Protocol Send Report Req failed"); - return -1; - } - - // Looking for report respond - status = readHdr(&hdr); - - if (status <= 0) - { - close(_socket); - _socket = 0; - logger_->log_error("Flow Control Protocol Read Report Resp header failed"); - return -1; - } - logger_->log_info("Flow Control Protocol receive MsgType %s", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType)); - logger_->log_info("Flow Control Protocol receive Seq Num %d", hdr.seqNumber); - logger_->log_info("Flow Control Protocol receive Resp Code %s", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status)); - logger_->log_info("Flow Control Protocol receive Payload len %d", hdr.payloadLen); - - if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber) - { - this->_seqNumber++; - uint8_t *payload = new uint8_t[hdr.payloadLen]; - uint8_t *payloadPtr = payload; - status = readData(payload, hdr.payloadLen); - if (status <= 0) - { - delete[] payload; - logger_->log_info("Flow Control Protocol Report Resp Read Payload fail"); - close(_socket); - _socket = 0; - return -1; - } - std::string processor; - std::string propertyName; - std::string propertyValue; - while (payloadPtr < (payload + hdr.payloadLen)) - { - uint32_t msgID; - payloadPtr = this->decode(payloadPtr, msgID); - if (((FlowControlMsgID) msgID) == PROCESSOR_NAME) - { - uint32_t len; - payloadPtr = this->decode(payloadPtr, len); - processor = (const char *) payloadPtr; - payloadPtr += len; - logger_->log_info("Flow Control Protocol receive report resp processor %s", processor.c_str()); - } - else if (((FlowControlMsgID) msgID) == PROPERTY_NAME) - { - uint32_t len; - payloadPtr = this->decode(payloadPtr, len); - propertyName = (const char *) payloadPtr; - payloadPtr += len; - logger_->log_info("Flow Control Protocol receive report resp property name %s", propertyName.c_str()); - } - else if (((FlowControlMsgID) msgID) == PROPERTY_VALUE) - { - uint32_t len; - payloadPtr = this->decode(payloadPtr, len); - propertyValue = (const char *) payloadPtr; - payloadPtr += len; - logger_->log_info("Flow Control Protocol receive report resp property value %s", propertyValue.c_str()); - this->_controller->updatePropertyValue(processor, propertyName, propertyValue); - } - else - { - break; - } - } - delete[] payload; - close(_socket); - _socket = 0; - return 0; - } - else if (hdr.status == RESP_TRIGGER_REGISTER && hdr.seqNumber == this->_seqNumber) - { - logger_->log_info("Flow Control Protocol trigger reregister"); - this->_registered = false; - this->_seqNumber++; - close(_socket); - _socket = 0; - return 0; - } - else if (hdr.status == RESP_STOP_FLOW_CONTROLLER && hdr.seqNumber == this->_seqNumber) - { - logger_->log_info("Flow Control Protocol stop flow controller"); - this->_controller->stop(true); - this->_seqNumber++; - close(_socket); - _socket = 0; - return 0; - } - else if (hdr.status == RESP_START_FLOW_CONTROLLER && hdr.seqNumber == this->_seqNumber) - { - logger_->log_info("Flow Control Protocol start flow controller"); - this->_controller->start(); - this->_seqNumber++; - close(_socket); - _socket = 0; - return 0; - } - else - { - logger_->log_info("Flow Control Protocol Report fail"); - close(_socket); - _socket = 0; - return -1; - } +int FlowControlProtocol::sendReportReq() { + uint16_t port = this->_serverPort; + + if (this->_socket <= 0) + this->_socket = connectServer(_serverName.c_str(), port); + + if (this->_socket <= 0) + return -1; + + // Calculate the total payload msg size + uint32_t payloadSize = FlowControlMsgIDEncodingLen( + FLOW_YML_NAME, this->_controller->getName().size() + 1); + uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize; + + uint8_t *data = new uint8_t[size]; + uint8_t *start = data; + + // encode the HDR + FlowControlProtocolHeader hdr; + hdr.msgType = REPORT_REQ; + hdr.payloadLen = payloadSize; + hdr.seqNumber = this->_seqNumber; + hdr.status = RESP_SUCCESS; + data = this->encode(data, hdr.msgType); + data = this->encode(data, hdr.seqNumber); + data = this->encode(data, hdr.status); + data = this->encode(data, hdr.payloadLen); + + // encode the YAML name + data = this->encode(data, FLOW_YML_NAME); + data = this->encode(data, this->_controller->getName()); + + // send it + int status = sendData(start, size); + delete[] start; + if (status <= 0) { + close(_socket); + _socket = 0; + logger_->log_error("Flow Control Protocol Send Report Req failed"); + return -1; + } + + // Looking for report respond + status = readHdr(&hdr); + + if (status <= 0) { + close(_socket); + _socket = 0; + logger_->log_error("Flow Control Protocol Read Report Resp header failed"); + return -1; + } + logger_->log_info("Flow Control Protocol receive MsgType %s", + FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType)); + logger_->log_info("Flow Control Protocol receive Seq Num %d", hdr.seqNumber); + logger_->log_info("Flow Control Protocol receive Resp Code %s", + FlowControlRespCodeToStr((FlowControlRespCode) hdr.status)); + logger_->log_info("Flow Control Protocol receive Payload len %d", + hdr.payloadLen); + + if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber) { + this->_seqNumber++; + uint8_t *payload = new uint8_t[hdr.payloadLen]; + uint8_t *payloadPtr = payload; + status = readData(payload, hdr.payloadLen); + if (status <= 0) { + delete[] payload; + logger_->log_info("Flow Control Protocol Report Resp Read Payload fail"); + close(_socket); + _socket = 0; + return -1; + } + std::string processor; + std::string propertyName; + std::string propertyValue; + while (payloadPtr < (payload + hdr.payloadLen)) { + uint32_t msgID; + payloadPtr = this->decode(payloadPtr, msgID); + if (((FlowControlMsgID) msgID) == PROCESSOR_NAME) { + uint32_t len; + payloadPtr = this->decode(payloadPtr, len); + processor = (const char *) payloadPtr; + payloadPtr += len; + logger_->log_info( + "Flow Control Protocol receive report resp processor %s", + processor.c_str()); + } else if (((FlowControlMsgID) msgID) == PROPERTY_NAME) { + uint32_t len; + payloadPtr = this->decode(payloadPtr, len); + propertyName = (const char *) payloadPtr; + payloadPtr += len; + logger_->log_info( + "Flow Control Protocol receive report resp property name %s", + propertyName.c_str()); + } else if (((FlowControlMsgID) msgID) == PROPERTY_VALUE) { + uint32_t len; + payloadPtr = this->decode(payloadPtr, len); + propertyValue = (const char *) payloadPtr; + payloadPtr += len; + logger_->log_info( + "Flow Control Protocol receive report resp property value %s", + propertyValue.c_str()); + this->_controller->updatePropertyValue(processor, propertyName, + propertyValue); + } else { + break; + } + } + delete[] payload; + close(_socket); + _socket = 0; + return 0; + } else if (hdr.status == RESP_TRIGGER_REGISTER + && hdr.seqNumber == this->_seqNumber) { + logger_->log_info("Flow Control Protocol trigger reregister"); + this->_registered = false; + this->_seqNumber++; + close(_socket); + _socket = 0; + return 0; + } else if (hdr.status == RESP_STOP_FLOW_CONTROLLER + && hdr.seqNumber == this->_seqNumber) { + logger_->log_info("Flow Control Protocol stop flow controller"); + this->_controller->stop(true); + this->_seqNumber++; + close(_socket); + _socket = 0; + return 0; + } else if (hdr.status == RESP_START_FLOW_CONTROLLER + && hdr.seqNumber == this->_seqNumber) { + logger_->log_info("Flow Control Protocol start flow controller"); + this->_controller->start(); + this->_seqNumber++; + close(_socket); + _socket = 0; + return 0; + } else { + logger_->log_info("Flow Control Protocol Report fail"); + close(_socket); + _socket = 0; + return -1; + } } +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */
