http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/Provenance.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Provenance.h b/libminifi/include/Provenance.h deleted file mode 100644 index 3ba9792..0000000 --- a/libminifi/include/Provenance.h +++ /dev/null @@ -1,604 +0,0 @@ -/** - * @file Provenance.h - * Flow file record class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __PROVENANCE_H__ -#define __PROVENANCE_H__ - -#include <ftw.h> -#include <uuid/uuid.h> -#include <atomic> -#include <cstdint> -#include <cstring> -#include <iostream> -#include <map> -#include <set> -#include <string> -#include <thread> -#include <vector> - -#include "Configure.h" -#include "Connection.h" -#include "FlowFileRecord.h" -#include "Logger.h" -#include "Property.h" -#include "ResourceClaim.h" -#include "io/Serializable.h" -#include "utils/TimeUtil.h" -#include "Repository.h" - -class ProvenanceRepository; - -//! Provenance Event Record -class ProvenanceEventRecord : protected Serializable -{ -public: - enum ProvenanceEventType { - - /** - * A CREATE event is used when a FlowFile is generated from data that was - * not received from a remote system or external process - */ - CREATE, - - /** - * Indicates a provenance event for receiving data from an external process. This Event Type - * is expected to be the first event for a FlowFile. As such, a Processor that receives data - * from an external source and uses that data to replace the content of an existing FlowFile - * should use the {@link #FETCH} event type, rather than the RECEIVE event type. - */ - RECEIVE, - - /** - * Indicates that the contents of a FlowFile were overwritten using the contents of some - * external resource. This is similar to the {@link #RECEIVE} event but varies in that - * RECEIVE events are intended to be used as the event that introduces the FlowFile into - * the system, whereas FETCH is used to indicate that the contents of an existing FlowFile - * were overwritten. - */ - FETCH, - - /** - * Indicates a provenance event for sending data to an external process - */ - SEND, - - /** - * Indicates that the contents of a FlowFile were downloaded by a user or external entity. - */ - DOWNLOAD, - - /** - * Indicates a provenance event for the conclusion of an object's life for - * some reason other than object expiration - */ - DROP, - - /** - * Indicates a provenance event for the conclusion of an object's life due - * to the fact that the object could not be processed in a timely manner - */ - EXPIRE, - - /** - * FORK is used to indicate that one or more FlowFile was derived from a - * parent FlowFile. - */ - FORK, - - /** - * JOIN is used to indicate that a single FlowFile is derived from joining - * together multiple parent FlowFiles. - */ - JOIN, - - /** - * CLONE is used to indicate that a FlowFile is an exact duplicate of its - * parent FlowFile. - */ - CLONE, - - /** - * CONTENT_MODIFIED is used to indicate that a FlowFile's content was - * modified in some way. When using this Event Type, it is advisable to - * provide details about how the content is modified. - */ - CONTENT_MODIFIED, - - /** - * ATTRIBUTES_MODIFIED is used to indicate that a FlowFile's attributes were - * modified in some way. This event is not needed when another event is - * reported at the same time, as the other event will already contain all - * FlowFile attributes. - */ - ATTRIBUTES_MODIFIED, - - /** - * ROUTE is used to show that a FlowFile was routed to a specified - * {@link org.apache.nifi.processor.Relationship Relationship} and should provide - * information about why the FlowFile was routed to this relationship. - */ - ROUTE, - - /** - * Indicates a provenance event for adding additional information such as a - * new linkage to a new URI or UUID - */ - ADDINFO, - - /** - * Indicates a provenance event for replaying a FlowFile. The UUID of the - * event will indicate the UUID of the original FlowFile that is being - * replayed. The event will contain exactly one Parent UUID that is also the - * UUID of the FlowFile that is being replayed and exactly one Child UUID - * that is the UUID of the a newly created FlowFile that will be re-queued - * for processing. - */ - REPLAY - }; - friend class ProcessSession; -public: - //! Constructor - /*! - * Create a new provenance event record - */ - ProvenanceEventRecord(ProvenanceEventType event, std::string componentId, std::string componentType) { - _eventType = event; - _componentId = componentId; - _componentType = componentType; - _eventTime = getTimeMillis(); - char eventIdStr[37]; - // Generate the global UUID for th event - uuid_generate(_eventId); - uuid_unparse_lower(_eventId, eventIdStr); - _eventIdStr = eventIdStr; - logger_ = Logger::getLogger(); - } - - ProvenanceEventRecord() { - _eventTime = getTimeMillis(); - logger_ = Logger::getLogger(); - } - - //! Destructor - virtual ~ProvenanceEventRecord() { - } - //! Get the Event ID - std::string getEventId() { - return _eventIdStr; - } - //! Get Attributes - std::map<std::string, std::string> getAttributes() { - return _attributes; - } - //! Get Size - uint64_t getFileSize() { - return _size; - } - // ! Get Offset - uint64_t getFileOffset() { - return _offset; - } - // ! Get Entry Date - uint64_t getFlowFileEntryDate() { - return _entryDate; - } - // ! Get Lineage Start Date - uint64_t getlineageStartDate() { - return _lineageStartDate; - } - // ! Get Event Time - uint64_t getEventTime() { - return _eventTime; - } - // ! Get Event Duration - uint64_t getEventDuration() { - return _eventDuration; - } - //! Set Event Duration - void setEventDuration(uint64_t duration) - { - _eventDuration = duration; - } - // ! Get Event Type - ProvenanceEventType getEventType() { - return _eventType; - } - //! Get Component ID - std::string getComponentId() - { - return _componentId; - } - //! Get Component Type - std::string getComponentType() - { - return _componentType; - } - //! Get FlowFileUuid - std::string getFlowFileUuid() - { - return _uuid; - } - //! Get content full path - std::string getContentFullPath() - { - return _contentFullPath; - } - //! Get LineageIdentifiers - std::set<std::string> getLineageIdentifiers() - { - return _lineageIdentifiers; - } - //! Get Details - std::string getDetails() - { - return _details; - } - //! Set Details - void setDetails(std::string details) - { - _details = details; - } - //! Get TransitUri - std::string getTransitUri() - { - return _transitUri; - } - //! Set TransitUri - void setTransitUri(std::string uri) - { - _transitUri = uri; - } - //! Get SourceSystemFlowFileIdentifier - std::string getSourceSystemFlowFileIdentifier() - { - return _sourceSystemFlowFileIdentifier; - } - //! Set SourceSystemFlowFileIdentifier - void setSourceSystemFlowFileIdentifier(std::string identifier) - { - _sourceSystemFlowFileIdentifier = identifier; - } - //! Get Parent UUIDs - std::vector<std::string> getParentUuids() - { - return _parentUuids; - } - //! Add Parent UUID - void addParentUuid(std::string uuid) - { - if (std::find(_parentUuids.begin(), _parentUuids.end(), uuid) != _parentUuids.end()) - return; - else - _parentUuids.push_back(uuid); - } - //! Add Parent Flow File - void addParentFlowFile(FlowFileRecord *flow) - { - addParentUuid(flow->getUUIDStr()); - return; - } - //! Remove Parent UUID - void removeParentUuid(std::string uuid) - { - _parentUuids.erase(std::remove(_parentUuids.begin(), _parentUuids.end(), uuid), _parentUuids.end()); - } - //! Remove Parent Flow File - void removeParentFlowFile(FlowFileRecord *flow) - { - removeParentUuid(flow->getUUIDStr()); - return; - } - //! Get Children UUIDs - std::vector<std::string> getChildrenUuids() - { - return _childrenUuids; - } - //! Add Child UUID - void addChildUuid(std::string uuid) - { - if (std::find(_childrenUuids.begin(), _childrenUuids.end(), uuid) != _childrenUuids.end()) - return; - else - _childrenUuids.push_back(uuid); - } - //! Add Child Flow File - void addChildFlowFile(FlowFileRecord *flow) - { - addChildUuid(flow->getUUIDStr()); - return; - } - //! Remove Child UUID - void removeChildUuid(std::string uuid) - { - _childrenUuids.erase(std::remove(_childrenUuids.begin(), _childrenUuids.end(), uuid), _childrenUuids.end()); - } - //! Remove Child Flow File - void removeChildFlowFile(FlowFileRecord *flow) - { - removeChildUuid(flow->getUUIDStr()); - return; - } - //! Get AlternateIdentifierUri - std::string getAlternateIdentifierUri() - { - return _alternateIdentifierUri; - } - //! Set AlternateIdentifierUri - void setAlternateIdentifierUri(std::string uri) - { - _alternateIdentifierUri = uri; - } - //! Get Relationship - std::string getRelationship() - { - return _relationship; - } - //! Set Relationship - void setRelationship(std::string relation) - { - _relationship = relation; - } - //! Get sourceQueueIdentifier - std::string getSourceQueueIdentifier() - { - return _sourceQueueIdentifier; - } - //! Set sourceQueueIdentifier - void setSourceQueueIdentifier(std::string identifier) - { - _sourceQueueIdentifier = identifier; - } - //! fromFlowFile - void fromFlowFile(FlowFileRecord *flow) - { - _entryDate = flow->getEntryDate(); - _lineageStartDate = flow->getlineageStartDate(); - _lineageIdentifiers = flow->getlineageIdentifiers(); - _uuid = flow->getUUIDStr(); - _attributes = flow->getAttributes(); - _size = flow->getSize(); - _offset = flow->getOffset(); - if (flow->getOriginalConnection()) - _sourceQueueIdentifier = flow->getOriginalConnection()->getName(); - if (flow->getResourceClaim()) - { - _contentFullPath = flow->getResourceClaim()->getContentFullPath(); - } - } - //! Serialize and Persistent to the repository - bool Serialize(ProvenanceRepository *repo); - //! DeSerialize - bool DeSerialize(const uint8_t *buffer, const int bufferSize); - //! DeSerialize - bool DeSerialize(DataStream &stream) - { - return DeSerialize(stream.getBuffer(),stream.getSize()); - } - //! DeSerialize - bool DeSerialize(ProvenanceRepository *repo, std::string key); - -protected: - - //! Event type - ProvenanceEventType _eventType; - //! Date at which the event was created - uint64_t _eventTime; - //! Date at which the flow file entered the flow - uint64_t _entryDate; - //! Date at which the origin of this flow file entered the flow - uint64_t _lineageStartDate; - //! Event Duration - uint64_t _eventDuration; - //! Component ID - std::string _componentId; - //! Component Type - std::string _componentType; - //! Size in bytes of the data corresponding to this flow file - uint64_t _size; - //! flow uuid - std::string _uuid; - //! Offset to the content - uint64_t _offset; - //! Full path to the content - std::string _contentFullPath; - //! Attributes key/values pairs for the flow record - std::map<std::string, std::string> _attributes; - //! provenance ID - uuid_t _eventId; - //! UUID string for all parents - std::set<std::string> _lineageIdentifiers; - //! transitUri - std::string _transitUri; - //! sourceSystemFlowFileIdentifier - std::string _sourceSystemFlowFileIdentifier; - //! parent UUID - std::vector<std::string> _parentUuids; - //! child UUID - std::vector<std::string> _childrenUuids; - //! detail - std::string _details; - //! sourceQueueIdentifier - std::string _sourceQueueIdentifier; - //! event ID Str - std::string _eventIdStr; - //! relationship - std::string _relationship; - //! alternateIdentifierUri; - std::string _alternateIdentifierUri; - -private: - - //! Logger - std::shared_ptr<Logger> logger_; - - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - ProvenanceEventRecord(const ProvenanceEventRecord &parent); - ProvenanceEventRecord &operator=(const ProvenanceEventRecord &parent); - -}; - -//! Provenance Reporter -class ProvenanceReporter -{ - friend class ProcessSession; -public: - //! Constructor - /*! - * Create a new provenance reporter associated with the process session - */ - ProvenanceReporter(std::string componentId, std::string componentType) { - logger_ = Logger::getLogger(); - _componentId = componentId; - _componentType = componentType; - } - - //! Destructor - virtual ~ProvenanceReporter() { - clear(); - } - //! Get events - std::set<ProvenanceEventRecord *> getEvents() - { - return _events; - } - //! Add event - void add(ProvenanceEventRecord *event) - { - _events.insert(event); - } - //! Remove event - void remove(ProvenanceEventRecord *event) - { - if (_events.find(event) != _events.end()) - { - _events.erase(event); - } - } - //! - //! clear - void clear() - { - for (auto it : _events) - { - delete it; - } - _events.clear(); - } - //! allocate - ProvenanceEventRecord *allocate(ProvenanceEventRecord::ProvenanceEventType eventType, FlowFileRecord *flow) - { - ProvenanceEventRecord *event = new ProvenanceEventRecord(eventType, _componentId, _componentType); - if (event) - event->fromFlowFile(flow); - - return event; - } - //! commit - void commit(); - //! create - void create(FlowFileRecord *flow, std::string detail); - //! route - void route(FlowFileRecord *flow, Relationship relation, std::string detail, uint64_t processingDuration); - //! modifyAttributes - void modifyAttributes(FlowFileRecord *flow, std::string detail); - //! modifyContent - void modifyContent(FlowFileRecord *flow, std::string detail, uint64_t processingDuration); - //! clone - void clone(FlowFileRecord *parent, FlowFileRecord *child); - //! join - void join(std::vector<FlowFileRecord *> parents, FlowFileRecord *child, std::string detail, uint64_t processingDuration); - //! fork - void fork(std::vector<FlowFileRecord *> child, FlowFileRecord *parent, std::string detail, uint64_t processingDuration); - //! expire - void expire(FlowFileRecord *flow, std::string detail); - //! drop - void drop(FlowFileRecord *flow, std::string reason); - //! send - void send(FlowFileRecord *flow, std::string transitUri, std::string detail, uint64_t processingDuration, bool force); - //! fetch - void fetch(FlowFileRecord *flow, std::string transitUri, std::string detail, uint64_t processingDuration); - //! receive - void receive(FlowFileRecord *flow, std::string transitUri, std::string sourceSystemFlowFileIdentifier, std::string detail, uint64_t processingDuration); - -protected: - - //! Component ID - std::string _componentId; - //! Component Type - std::string _componentType; - -private: - - //! Incoming connection Iterator - std::set<ProvenanceEventRecord *> _events; - //! Logger - std::shared_ptr<Logger> logger_; - - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - ProvenanceReporter(const ProvenanceReporter &parent); - ProvenanceReporter &operator=(const ProvenanceReporter &parent); -}; - -#define PROVENANCE_DIRECTORY "./provenance_repository" -#define MAX_PROVENANCE_STORAGE_SIZE (10*1024*1024) // 10M -#define MAX_PROVENANCE_ENTRY_LIFE_TIME (60000) // 1 minute -#define PROVENANCE_PURGE_PERIOD (2500) // 2500 msec - -//! Provenance Repository -class ProvenanceRepository : public Repository -{ -public: - //! Constructor - /*! - * Create a new provenance repository - */ - ProvenanceRepository() - : Repository(Repository::PROVENANCE, PROVENANCE_DIRECTORY, - MAX_PROVENANCE_ENTRY_LIFE_TIME, MAX_PROVENANCE_STORAGE_SIZE, PROVENANCE_PURGE_PERIOD) - { - } - - //! Destructor - virtual ~ProvenanceRepository() { - } - - //! Persistent event - void registerEvent(ProvenanceEventRecord *event) - { - event->Serialize(this); - } - //! Remove event - void removeEvent(ProvenanceEventRecord *event) - { - Delete(event->getEventId()); - } - -protected: - -private: - - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - ProvenanceRepository(const ProvenanceRepository &parent); - ProvenanceRepository &operator=(const ProvenanceRepository &parent); -}; - -#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/PutFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/PutFile.h b/libminifi/include/PutFile.h deleted file mode 100644 index 015605e..0000000 --- a/libminifi/include/PutFile.h +++ /dev/null @@ -1,88 +0,0 @@ -/** - * @file PutFile.h - * PutFile class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __PUT_FILE_H__ -#define __PUT_FILE_H__ - -#include "FlowFileRecord.h" -#include "Processor.h" -#include "ProcessSession.h" - -//! PutFile Class -class PutFile : public Processor -{ -public: - - static const std::string CONFLICT_RESOLUTION_STRATEGY_REPLACE; - static const std::string CONFLICT_RESOLUTION_STRATEGY_IGNORE; - static const std::string CONFLICT_RESOLUTION_STRATEGY_FAIL; - - //! Constructor - /*! - * Create a new processor - */ - PutFile(std::string name, uuid_t uuid = NULL) - : Processor(name, uuid) - { - logger_ = Logger::getLogger(); - } - //! Destructor - virtual ~PutFile() - { - } - //! Processor Name - static const std::string ProcessorName; - //! Supported Properties - static Property Directory; - static Property ConflictResolution; - //! Supported Relationships - static Relationship Success; - static Relationship Failure; - - //! OnTrigger method, implemented by NiFi PutFile - virtual void onTrigger(ProcessContext *context, ProcessSession *session); - //! Initialize, over write by NiFi PutFile - virtual void initialize(void); - - class ReadCallback : public InputStreamCallback - { - public: - ReadCallback(const std::string &tmpFile, const std::string &destFile); - ~ReadCallback(); - virtual void process(std::ifstream *stream); - bool commit(); - - private: - std::shared_ptr<Logger> logger_; - std::ofstream _tmpFileOs; - bool _writeSucceeded = false; - std::string _tmpFile; - std::string _destFile; - }; - -protected: - -private: - //! Logger - std::shared_ptr<Logger> logger_; - - bool putFile(ProcessSession *session, FlowFileRecord *flowFile, const std::string &tmpFile, const std::string &destFile); -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/RealTimeDataCollector.h ---------------------------------------------------------------------- diff --git a/libminifi/include/RealTimeDataCollector.h b/libminifi/include/RealTimeDataCollector.h deleted file mode 100644 index 3b6d05f..0000000 --- a/libminifi/include/RealTimeDataCollector.h +++ /dev/null @@ -1,131 +0,0 @@ -/** - * @file RealTimeDataCollector.h - * RealTimeDataCollector class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __REAL_TIME_DATA_COLLECTOR_H__ -#define __REAL_TIME_DATA_COLLECTOR_H__ - -#include <stdio.h> -#include <unistd.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <arpa/inet.h> -#include <fcntl.h> -#include <netdb.h> -#include <string> -#include <errno.h> -#include "FlowFileRecord.h" -#include "Processor.h" -#include "ProcessSession.h" - -//! RealTimeDataCollector Class -class RealTimeDataCollector : public Processor -{ -public: - //! Constructor - /*! - * Create a new processor - */ - RealTimeDataCollector(std::string name, uuid_t uuid = NULL) - : Processor(name, uuid) - { - _realTimeSocket = 0; - _batchSocket = 0; - logger_ = Logger::getLogger(); - _firstInvoking = false; - _realTimeAccumulated = 0; - _batchAcccumulated = 0; - _queuedDataSize = 0; - } - //! Destructor - virtual ~RealTimeDataCollector() - { - if (_realTimeSocket) - close(_realTimeSocket); - if (_batchSocket) - close(_batchSocket); - if (_fileStream.is_open()) - _fileStream.close(); - } - //! Processor Name - static const std::string ProcessorName; - //! Supported Properties - static Property REALTIMESERVERNAME; - static Property REALTIMESERVERPORT; - static Property BATCHSERVERNAME; - static Property BATCHSERVERPORT; - static Property FILENAME; - static Property ITERATION; - static Property REALTIMEMSGID; - static Property BATCHMSGID; - static Property REALTIMEINTERVAL; - static Property BATCHINTERVAL; - static Property BATCHMAXBUFFERSIZE; - //! Supported Relationships - static Relationship Success; - //! Connect to the socket - int connectServer(const char *host, uint16_t port); - int sendData(int socket, const char *buf, int buflen); - void onTriggerRealTime(ProcessContext *context, ProcessSession *session); - void onTriggerBatch(ProcessContext *context, ProcessSession *session); - -public: - //! OnTrigger method, implemented by NiFi RealTimeDataCollector - virtual void onTrigger(ProcessContext *context, ProcessSession *session); - //! Initialize, over write by NiFi RealTimeDataCollector - virtual void initialize(void); - -protected: - -private: - //! realtime server Name - std::string _realTimeServerName; - int64_t _realTimeServerPort; - std::string _batchServerName; - int64_t _batchServerPort; - int64_t _realTimeInterval; - int64_t _batchInterval; - int64_t _batchMaxBufferSize; - //! Match pattern for Real time Message ID - std::vector<std::string> _realTimeMsgID; - //! Match pattern for Batch Message ID - std::vector<std::string> _batchMsgID; - //! file for which the realTime collector will tail - std::string _fileName; - //! Whether we need to iterate from the beginning for demo - bool _iteration; - int _realTimeSocket; - int _batchSocket; - //! Logger - std::shared_ptr<Logger> logger_; - //! Mutex for protection - std::mutex _mtx; - //! Queued data size - uint64_t _queuedDataSize; - //! Queue for the batch process - std::queue<std::string> _queue; - std::thread::id _realTimeThreadId; - std::thread::id _batchThreadId; - std::atomic<bool> _firstInvoking; - int64_t _realTimeAccumulated; - int64_t _batchAcccumulated; - std::ifstream _fileStream; -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/Relationship.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Relationship.h b/libminifi/include/Relationship.h deleted file mode 100644 index 3454ee5..0000000 --- a/libminifi/include/Relationship.h +++ /dev/null @@ -1,87 +0,0 @@ -/** - * @file Relationship.h - * Relationship class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __RELATIONSHIP_H__ -#define __RELATIONSHIP_H__ - -#include <string> -#include <uuid/uuid.h> -#include <vector> -#include <queue> -#include <map> -#include <mutex> -#include <atomic> - -//! undefined relationship for remote process group outgoing port and root process group incoming port -#define UNDEFINED_RELATIONSHIP "undefined" - -inline bool isRelationshipNameUndefined(std::string name) -{ - if (name == UNDEFINED_RELATIONSHIP) - return true; - else - return false; -} - -//! Relationship Class -class Relationship { - -public: - //! Constructor - /*! - * Create a new relationship - */ - Relationship(const std::string name, const std::string description) - : _name(name), _description(description) { - } - Relationship() - : _name(UNDEFINED_RELATIONSHIP) { - } - //! Destructor - virtual ~Relationship() { - } - //! Get Name for the relationship - std::string getName() { - return _name; - } - //! Get Description for the relationship - std::string getDescription() { - return _description; - } - //! Compare - bool operator < (const Relationship & right) const { - return _name < right._name; - } - //! Whether it is a undefined relationship - bool isRelationshipUndefined() - { - return isRelationshipNameUndefined(_name); - } - -protected: - - //! Name - std::string _name; - //! Description - std::string _description; - -private: -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/RemoteProcessorGroupPort.h ---------------------------------------------------------------------- diff --git a/libminifi/include/RemoteProcessorGroupPort.h b/libminifi/include/RemoteProcessorGroupPort.h index 05ecd17..e9a4228 100644 --- a/libminifi/include/RemoteProcessorGroupPort.h +++ b/libminifi/include/RemoteProcessorGroupPort.h @@ -20,71 +20,91 @@ #ifndef __REMOTE_PROCESSOR_GROUP_PORT_H__ #define __REMOTE_PROCESSOR_GROUP_PORT_H__ +#include <mutex> #include <memory> +#include <stack> #include "FlowFileRecord.h" -#include "Processor.h" -#include "ProcessSession.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" #include "Site2SiteClientProtocol.h" -//! RemoteProcessorGroupPort Class -class RemoteProcessorGroupPort: public Processor { -public: - //! Constructor - /*! - * Create a new processor - */ - RemoteProcessorGroupPort(std::string name, uuid_t uuid = NULL) : - Processor(name, uuid), direction_(SEND), transmitting_(false), peer_() { - logger_ = Logger::getLogger(); - protocol_ = std::unique_ptr<Site2SiteClientProtocol>( - new Site2SiteClientProtocol(0)); - protocol_->setPortId(uuid); - } - //! Destructor - virtual ~RemoteProcessorGroupPort() { - } - //! Processor Name - static const std::string ProcessorName; - //! Supported Properties - static Property hostName; - static Property port; - //! Supported Relationships - static Relationship relation; -public: - //! OnTrigger method, implemented by NiFi RemoteProcessorGroupPort - virtual void onTrigger(ProcessContext *context, ProcessSession *session); - //! Initialize, over write by NiFi RemoteProcessorGroupPort - virtual void initialize(void); - //! Set Direction - void setDirection(TransferDirection direction) { - direction_ = direction; - if (direction_ == RECEIVE) - this->setTriggerWhenEmpty(true); - } - //! Set Timeout - void setTimeOut(uint64_t timeout) { - protocol_->setTimeOut(timeout); - } - //! SetTransmitting - void setTransmitting(bool val) { - transmitting_ = val; - } +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +// RemoteProcessorGroupPort Class +class RemoteProcessorGroupPort : + public core::Processor { + public: + // Constructor + /*! + * Create a new processor + */ + RemoteProcessorGroupPort(std::string name, uuid_t uuid = NULL) + : core::Processor(name, uuid), + direction_(SEND), + transmitting_(false){ + logger_ = logging::Logger::getLogger(); + uuid_copy(protocol_uuid_,uuid); + } + // Destructor + virtual ~RemoteProcessorGroupPort() { -protected: + } + // Processor Name + static const std::string ProcessorName; + // Supported Properties + static core::Property hostName; + static core::Property port; + // Supported Relationships + static core::Relationship relation; + public: + // OnTrigger method, implemented by NiFi RemoteProcessorGroupPort + virtual void onTrigger( + core::ProcessContext *context, + core::ProcessSession *session); + // Initialize, over write by NiFi RemoteProcessorGroupPort + virtual void initialize(void); + // Set Direction + void setDirection(TransferDirection direction) { + direction_ = direction; + if (direction_ == RECEIVE) + this->setTriggerWhenEmpty(true); + } + // Set Timeout + void setTimeOut(uint64_t timeout) { + timeout_ = timeout; + } + // SetTransmitting + void setTransmitting(bool val) { + transmitting_ = val; + } -private: - //! Logger - std::shared_ptr<Logger> logger_; - //! Peer Connection - Site2SitePeer peer_; - //! Peer Protocol - std::unique_ptr<Site2SiteClientProtocol> protocol_; - //! Transaction Direction - TransferDirection direction_; - //! Transmitting - bool transmitting_; + protected: + private: + + std::unique_ptr<Site2SiteClientProtocol> getNextProtocol(); + void returnProtocol(std::unique_ptr<Site2SiteClientProtocol> protocol); + + std::stack<std::unique_ptr<Site2SiteClientProtocol>> available_protocols_; + std::mutex protocol_mutex_; + // Logger + std::shared_ptr<logging::Logger> logger_; + // Transaction Direction + TransferDirection direction_; + // Transmitting + bool transmitting_; + // timeout + uint64_t timeout_; + + uuid_t protocol_uuid_; + }; +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ #endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/Repository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Repository.h b/libminifi/include/Repository.h deleted file mode 100644 index 55fb442..0000000 --- a/libminifi/include/Repository.h +++ /dev/null @@ -1,318 +0,0 @@ -/** - * @file Repository - * Repository class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __REPOSITORY_H__ -#define __REPOSITORY_H__ - -#include <ftw.h> -#include <uuid/uuid.h> -#include <atomic> -#include <cstdint> -#include <cstring> -#include <iostream> -#include <map> -#include <set> -#include <string> -#include <thread> -#include <vector> - -#ifdef LEVELDB_SUPPORT -#include "leveldb/db.h" -#include "leveldb/options.h" -#include "leveldb/slice.h" -#include "leveldb/status.h" -#endif -#include "Configure.h" -#include "Connection.h" -#include "FlowFileRecord.h" -#include "Logger.h" -#include "Property.h" -#include "ResourceClaim.h" -#include "io/Serializable.h" -#include "utils/TimeUtil.h" -#include "utils/StringUtils.h" - -//! Repository -class Repository -{ -public: - enum RepositoryType { - //! Provenance Repo Type - PROVENANCE, - //! FlowFile Repo Type - FLOWFILE, - MAX_REPO_TYPE - }; - static const char *RepositoryTypeStr[MAX_REPO_TYPE]; - //! Constructor - /*! - * Create a new provenance repository - */ - Repository(RepositoryType type, std::string directory, - int64_t maxPartitionMillis, int64_t maxPartitionBytes, uint64_t purgePeriod) { - _type = type; - _directory = directory; - _maxPartitionMillis = maxPartitionMillis; - _maxPartitionBytes = maxPartitionBytes; - _purgePeriod = purgePeriod; - logger_ = Logger::getLogger(); - configure_ = Configure::getConfigure(); -#ifdef LEVELDB_SUPPORT - _db = NULL; -#endif - _thread = NULL; - _running = false; - _repoFull = false; - _enable = true; - } - - //! Destructor - virtual ~Repository() { - stop(); - if (this->_thread) - delete this->_thread; - destroy(); - } - - //! initialize - virtual bool initialize() - { - std::string value; - -#ifdef LEVELDB_SUPPORT - if (_type == PROVENANCE) - { - if (!(configure_->get(Configure::nifi_provenance_repository_enable, value) - && StringUtils::StringToBool(value, _enable))) { - _enable = true; - } - if (!_enable) - return false; - if (configure_->get(Configure::nifi_provenance_repository_directory_default, value)) - { - _directory = value; - } - logger_->log_info("NiFi Provenance Repository Directory %s", _directory.c_str()); - if (configure_->get(Configure::nifi_provenance_repository_max_storage_size, value)) - { - Property::StringToInt(value, _maxPartitionBytes); - } - logger_->log_info("NiFi Provenance Max Partition Bytes %d", _maxPartitionBytes); - if (configure_->get(Configure::nifi_provenance_repository_max_storage_time, value)) - { - TimeUnit unit; - if (Property::StringToTime(value, _maxPartitionMillis, unit) && - Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis)) - { - } - } - logger_->log_info("NiFi Provenance Max Storage Time: [%d] ms", _maxPartitionMillis); - leveldb::Options options; - options.create_if_missing = true; - leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db); - if (status.ok()) - { - logger_->log_info("NiFi Provenance Repository database open %s success", _directory.c_str()); - } - else - { - logger_->log_error("NiFi Provenance Repository database open %s fail", _directory.c_str()); - return false; - } - } - - if (_type == FLOWFILE) - { - if (!(configure_->get(Configure::nifi_flowfile_repository_enable, value) - && StringUtils::StringToBool(value, _enable))) { - _enable = true; - } - if (!_enable) - return false; - if (configure_->get(Configure::nifi_flowfile_repository_directory_default, value)) - { - _directory = value; - } - logger_->log_info("NiFi FlowFile Repository Directory %s", _directory.c_str()); - if (configure_->get(Configure::nifi_flowfile_repository_max_storage_size, value)) - { - Property::StringToInt(value, _maxPartitionBytes); - } - logger_->log_info("NiFi FlowFile Max Partition Bytes %d", _maxPartitionBytes); - if (configure_->get(Configure::nifi_flowfile_repository_max_storage_time, value)) - { - TimeUnit unit; - if (Property::StringToTime(value, _maxPartitionMillis, unit) && - Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis)) - { - } - } - logger_->log_info("NiFi FlowFile Max Storage Time: [%d] ms", _maxPartitionMillis); - leveldb::Options options; - options.create_if_missing = true; - leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db); - if (status.ok()) - { - logger_->log_info("NiFi FlowFile Repository database open %s success", _directory.c_str()); - } - else - { - logger_->log_error("NiFi FlowFile Repository database open %s fail", _directory.c_str()); - return false; - } - } - - return true; -#else - return false; -#endif - } - //! Put - virtual bool Put(std::string key, uint8_t *buf, int bufLen) - { -#ifdef LEVELDB_SUPPORT - if (!_enable) - return false; - - // persistent to the DB - leveldb::Slice value((const char *) buf, bufLen); - leveldb::Status status; - status = _db->Put(leveldb::WriteOptions(), key, value); - if (status.ok()) - return true; - else - return false; -#else - return false; -#endif - } - //! Delete - virtual bool Delete(std::string key) - { -#ifdef LEVELDB_SUPPORT - if (!_enable) - return false; - leveldb::Status status; - status = _db->Delete(leveldb::WriteOptions(), key); - if (status.ok()) - return true; - else - return false; -#else - return false; -#endif - } - //! Get - virtual bool Get(std::string key, std::string &value) - { -#ifdef LEVELDB_SUPPORT - if (!_enable) - return false; - leveldb::Status status; - status = _db->Get(leveldb::ReadOptions(), key, &value); - if (status.ok()) - return true; - else - return false; -#else - return false; -#endif - } - //! Run function for the thread - static void run(Repository *repo); - //! Start the repository monitor thread - virtual void start(); - //! Stop the repository monitor thread - virtual void stop(); - //! whether the repo is full - virtual bool isFull() - { - return _repoFull; - } - //! whether the repo is enable - virtual bool isEnable() - { - return _enable; - } - -protected: - //! Repo Type - RepositoryType _type; - //! Mutex for protection - std::mutex _mtx; - //! repository directory - std::string _directory; - //! Logger - std::shared_ptr<Logger> logger_; - //! Configure - //! max db entry life time - Configure *configure_; - int64_t _maxPartitionMillis; - //! max db size - int64_t _maxPartitionBytes; - //! purge period - uint64_t _purgePeriod; -#ifdef LEVELDB_SUPPORT - //! level DB database - leveldb::DB* _db; -#endif - //! thread - std::thread *_thread; - //! whether the monitoring thread is running for the repo while it was enabled - bool _running; - //! whether it is enabled by minfi property for the repo - bool _enable; - //! whether stop accepting provenace event - std::atomic<bool> _repoFull; - //! repoSize - uint64_t repoSize(); - //! size of the directory - static uint64_t _repoSize[MAX_REPO_TYPE]; - //! call back for directory size - static int repoSumProvenance(const char *fpath, const struct stat *sb, int typeflag) - { - _repoSize[PROVENANCE] += sb->st_size; - return 0; - } - //! call back for directory size - static int repoSumFlowFile(const char *fpath, const struct stat *sb, int typeflag) - { - _repoSize[FLOWFILE] += sb->st_size; - return 0; - } - -private: - //! destroy - void destroy() - { -#ifdef LEVELDB_SUPPORT - if (_db) - { - delete _db; - _db = NULL; - } -#endif - } - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - Repository(const Repository &parent); - Repository &operator=(const Repository &parent); -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/ResourceClaim.h ---------------------------------------------------------------------- diff --git a/libminifi/include/ResourceClaim.h b/libminifi/include/ResourceClaim.h index 7ca79a3..4c5438c 100644 --- a/libminifi/include/ResourceClaim.h +++ b/libminifi/include/ResourceClaim.h @@ -27,75 +27,85 @@ #include <map> #include <mutex> #include <atomic> -#include "Configure.h" +#include "properties/Configure.h" -//! Default content directory +namespace org { +namespace apache { +namespace nifi { +namespace minifi { + + +// Default content directory #define DEFAULT_CONTENT_DIRECTORY "./content_repository" -//! ResourceClaim Class +// ResourceClaim Class class ResourceClaim { public: static std::string default_directory_path; - //! Constructor + // Constructor /*! * Create a new resource claim */ ResourceClaim(const std::string contentDirectory = default_directory_path); - //! Destructor + // Destructor virtual ~ResourceClaim() {} - //! increaseFlowFileRecordOwnedCount + // increaseFlowFileRecordOwnedCount void increaseFlowFileRecordOwnedCount() { ++_flowFileRecordOwnedCount; } - //! decreaseFlowFileRecordOwenedCount + // decreaseFlowFileRecordOwenedCount void decreaseFlowFileRecordOwnedCount() { --_flowFileRecordOwnedCount; } - //! getFlowFileRecordOwenedCount + // getFlowFileRecordOwenedCount uint64_t getFlowFileRecordOwnedCount() { return _flowFileRecordOwnedCount; } - //! Get the content full path + // Get the content full path std::string getContentFullPath() { return _contentFullPath; } - //! Set the content full path + // Set the content full path void setContentFullPath(std::string path) { _contentFullPath = path; } protected: - //! A global unique identifier + // A global unique identifier uuid_t _uuid; - //! A local unique identifier + // A local unique identifier uint64_t _id; - //! Full path to the content + // Full path to the content std::string _contentFullPath; - //! How many FlowFileRecord Own this cliam + // How many FlowFileRecord Own this cliam std::atomic<uint64_t> _flowFileRecordOwnedCount; private: - //! Configure + // Configure Configure *configure_; - //! Logger - std::shared_ptr<Logger> logger_; + // Logger + std::shared_ptr<logging::Logger> logger_; // Prevent default copy constructor and assignment operation // Only support pass by reference or pointer ResourceClaim(const ResourceClaim &parent); ResourceClaim &operator=(const ResourceClaim &parent); - //! Local resource claim number + // Local resource claim number static std::atomic<uint64_t> _localResourceClaimNumber; }; +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ #endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/SchedulingAgent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h index f6d5a1c..0493640 100644 --- a/libminifi/include/SchedulingAgent.h +++ b/libminifi/include/SchedulingAgent.h @@ -28,72 +28,92 @@ #include <algorithm> #include <thread> #include "utils/TimeUtil.h" -#include "Logger.h" -#include "Configure.h" +#include "core/core.h" +#include "core/logging/Logger.h" +#include "properties/Configure.h" #include "FlowFileRecord.h" -#include "Logger.h" -#include "Processor.h" -#include "ProcessContext.h" +#include "core/logging/Logger.h" +#include "core/Processor.h" +#include "core/ProcessContext.h" +#include "provenance/ProvenanceRepository.h" -//! SchedulingAgent Class -class SchedulingAgent -{ -public: - //! Constructor - /*! - * Create a new processor - */ - SchedulingAgent() { - configure_ = Configure::getConfigure(); - logger_ = Logger::getLogger(); - _running = false; - } - //! Destructor - virtual ~SchedulingAgent() - { - } - //! onTrigger, return whether the yield is need - bool onTrigger(Processor *processor, ProcessContext *processContext, ProcessSessionFactory *sessionFactory); - //! Whether agent has work to do - bool hasWorkToDo(Processor *processor); - //! Whether the outgoing need to be backpressure - bool hasTooMuchOutGoing(Processor *processor); - //! start - void start() { - _running = true; - } - //! stop - void stop() { - _running = false; - } +namespace org { +namespace apache { +namespace nifi { +namespace minifi { -public: - //! schedule, overwritten by different DrivenSchedulingAgent - virtual void schedule(Processor *processor) = 0; - //! unschedule, overwritten by different DrivenSchedulingAgent - virtual void unschedule(Processor *processor) = 0; -protected: - //! Logger - std::shared_ptr<Logger> logger_; - //! Configure - Configure *configure_; - //! Mutex for protection - std::mutex _mtx; - //! Whether it is running - std::atomic<bool> _running; - //! AdministrativeYieldDuration - int64_t _administrativeYieldDuration; - //! BoredYieldDuration - int64_t _boredYieldDuration; +// SchedulingAgent Class +class SchedulingAgent { + public: + // Constructor + /*! + * Create a new processor + */ + SchedulingAgent(std::shared_ptr<core::Repository> repo) { + configure_ = Configure::getConfigure(); + logger_ = logging::Logger::getLogger(); + running_ = false; + repo_ = repo; + } + // Destructor + virtual ~SchedulingAgent() { -private: - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - SchedulingAgent(const SchedulingAgent &parent); - SchedulingAgent &operator=(const SchedulingAgent &parent); + } + // onTrigger, return whether the yield is need + bool onTrigger( + std::shared_ptr<core::Processor> processor, + core::ProcessContext *processContext, + core::ProcessSessionFactory *sessionFactory); + // Whether agent has work to do + bool hasWorkToDo(std::shared_ptr<core::Processor> processor); + // Whether the outgoing need to be backpressure + bool hasTooMuchOutGoing( + std::shared_ptr<core::Processor> processor); + // start + void start() { + running_ = true; + } + // stop + void stop() { + running_ = false; + } + + public: + // schedule, overwritten by different DrivenSchedulingAgent + virtual void schedule( + std::shared_ptr<core::Processor> processor) = 0; + // unschedule, overwritten by different DrivenSchedulingAgent + virtual void unschedule( + std::shared_ptr<core::Processor> processor) = 0; + + SchedulingAgent(const SchedulingAgent &parent) = delete; + SchedulingAgent &operator=(const SchedulingAgent &parent) = delete; + protected: + // Logger + std::shared_ptr<logging::Logger> logger_; + // Configure + Configure *configure_; + // Mutex for protection + std::mutex mutex_; + // Whether it is running + std::atomic<bool> running_; + // AdministrativeYieldDuration + int64_t _administrativeYieldDuration; + // BoredYieldDuration + int64_t _boredYieldDuration; + + std::shared_ptr<core::Repository> repo_; + + private: + // Prevent default copy constructor and assignment operation + // Only support pass by reference or pointer }; +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ #endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/Site2SiteClientProtocol.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Site2SiteClientProtocol.h b/libminifi/include/Site2SiteClientProtocol.h index 444eed5..6120e3e 100644 --- a/libminifi/include/Site2SiteClientProtocol.h +++ b/libminifi/include/Site2SiteClientProtocol.h @@ -34,16 +34,23 @@ #include <thread> #include <algorithm> #include <uuid/uuid.h> -#include "Configure.h" -#include "Property.h" + +#include "core/Property.h" +#include "properties/Configure.h" #include "Site2SitePeer.h" #include "FlowFileRecord.h" -#include "Logger.h" -#include "ProcessContext.h" -#include "ProcessSession.h" +#include "core/logging/Logger.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" #include "io/CRCStream.h" -//! Resource Negotiated Status Code + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { + +// Resource Negotiated Status Code #define RESOURCE_OK 20 #define DIFFERENT_RESOURCE_VERSION 21 #define NEGOTIATED_ABORT 255 @@ -55,143 +62,144 @@ * transferred between a client and a remote NiFi instance. */ typedef enum { - /** - * * The client is to send data to the remote instance. - * */ - SEND, - /** - * * The client is to receive data from the remote instance. - * */ - RECEIVE + /** + * * The client is to send data to the remote instance. + * */ + SEND, + /** + * * The client is to receive data from the remote instance. + * */ + RECEIVE } TransferDirection; -//! Peer State +// Peer State typedef enum { - /** - * * IDLE - * */ - IDLE = 0, - /** - * * Socket Established - * */ - ESTABLISHED, - /** - * * HandShake Done - * */ - HANDSHAKED, - /** - * * After CodeDec Completion - * */ - READY + /** + * * IDLE + * */ + IDLE = 0, + /** + * * Socket Established + * */ + ESTABLISHED, + /** + * * HandShake Done + * */ + HANDSHAKED, + /** + * * After CodeDec Completion + * */ + READY } PeerState; -//! Transaction State +// Transaction State typedef enum { - /** - * * Transaction has been started but no data has been sent or received. - * */ - TRANSACTION_STARTED, - /** - * * Transaction has been started and data has been sent or received. - * */ - DATA_EXCHANGED, - /** - * * Data that has been transferred has been confirmed via its CRC. - * * Transaction is ready to be completed. - * */ - TRANSACTION_CONFIRMED, - /** - * * Transaction has been successfully completed. - * */ - TRANSACTION_COMPLETED, - /** - * * The Transaction has been canceled. - * */ - TRANSACTION_CANCELED, - /** - * * The Transaction ended in an error. - * */ - TRANSACTION_ERROR + /** + * * Transaction has been started but no data has been sent or received. + * */ + TRANSACTION_STARTED, + /** + * * Transaction has been started and data has been sent or received. + * */ + DATA_EXCHANGED, + /** + * * Data that has been transferred has been confirmed via its CRC. + * * Transaction is ready to be completed. + * */ + TRANSACTION_CONFIRMED, + /** + * * Transaction has been successfully completed. + * */ + TRANSACTION_COMPLETED, + /** + * * The Transaction has been canceled. + * */ + TRANSACTION_CANCELED, + /** + * * The Transaction ended in an error. + * */ + TRANSACTION_ERROR } TransactionState; -//! Request Type +// Request Type typedef enum { - NEGOTIATE_FLOWFILE_CODEC = 0, - REQUEST_PEER_LIST, - SEND_FLOWFILES, - RECEIVE_FLOWFILES, - SHUTDOWN, - MAX_REQUEST_TYPE + NEGOTIATE_FLOWFILE_CODEC = 0, + REQUEST_PEER_LIST, + SEND_FLOWFILES, + RECEIVE_FLOWFILES, + SHUTDOWN, + MAX_REQUEST_TYPE } RequestType; -//! Request Type Str +// Request Type Str static const char *RequestTypeStr[MAX_REQUEST_TYPE] = { - "NEGOTIATE_FLOWFILE_CODEC", "REQUEST_PEER_LIST", "SEND_FLOWFILES", - "RECEIVE_FLOWFILES", "SHUTDOWN" }; + "NEGOTIATE_FLOWFILE_CODEC", "REQUEST_PEER_LIST", "SEND_FLOWFILES", + "RECEIVE_FLOWFILES", "SHUTDOWN" }; -//! Respond Code +// Respond Code typedef enum { - RESERVED = 0, - // ResponseCode, so that we can indicate a 0 followed by some other bytes - - // handshaking properties - PROPERTIES_OK = 1, - UNKNOWN_PROPERTY_NAME = 230, - ILLEGAL_PROPERTY_VALUE = 231, - MISSING_PROPERTY = 232, - // transaction indicators - CONTINUE_TRANSACTION = 10, - FINISH_TRANSACTION = 11, - CONFIRM_TRANSACTION = 12, // "Explanation" of this code is the checksum - TRANSACTION_FINISHED = 13, - TRANSACTION_FINISHED_BUT_DESTINATION_FULL = 14, - CANCEL_TRANSACTION = 15, - BAD_CHECKSUM = 19, - // data availability indicators - MORE_DATA = 20, - NO_MORE_DATA = 21, - // port state indicators - UNKNOWN_PORT = 200, - PORT_NOT_IN_VALID_STATE = 201, - PORTS_DESTINATION_FULL = 202, - // authorization - UNAUTHORIZED = 240, - // error indicators - ABORT = 250, - UNRECOGNIZED_RESPONSE_CODE = 254, - END_OF_STREAM = 255 + RESERVED = 0, + // ResponseCode, so that we can indicate a 0 followed by some other bytes + + // handshaking properties + PROPERTIES_OK = 1, + UNKNOWN_PROPERTY_NAME = 230, + ILLEGAL_PROPERTY_VALUE = 231, + MISSING_PROPERTY = 232, + // transaction indicators + CONTINUE_TRANSACTION = 10, + FINISH_TRANSACTION = 11, + CONFIRM_TRANSACTION = 12, // "Explanation" of this code is the checksum + TRANSACTION_FINISHED = 13, + TRANSACTION_FINISHED_BUT_DESTINATION_FULL = 14, + CANCEL_TRANSACTION = 15, + BAD_CHECKSUM = 19, + // data availability indicators + MORE_DATA = 20, + NO_MORE_DATA = 21, + // port state indicators + UNKNOWN_PORT = 200, + PORT_NOT_IN_VALID_STATE = 201, + PORTS_DESTINATION_FULL = 202, + // authorization + UNAUTHORIZED = 240, + // error indicators + ABORT = 250, + UNRECOGNIZED_RESPONSE_CODE = 254, + END_OF_STREAM = 255 } RespondCode; -//! Respond Code Class +// Respond Code Class typedef struct { - RespondCode code; - const char *description; - bool hasDescription; + RespondCode code; + const char *description; + bool hasDescription; } RespondCodeContext; -//! Respond Code Context +// Respond Code Context static RespondCodeContext respondCodeContext[] = { { RESERVED, - "Reserved for Future Use", false }, { PROPERTIES_OK, "Properties OK", - false }, { UNKNOWN_PROPERTY_NAME, "Unknown Property Name", true }, { - ILLEGAL_PROPERTY_VALUE, "Illegal Property Value", true }, { - MISSING_PROPERTY, "Missing Property", true }, { CONTINUE_TRANSACTION, - "Continue Transaction", false }, { FINISH_TRANSACTION, - "Finish Transaction", false }, { CONFIRM_TRANSACTION, - "Confirm Transaction", true }, { TRANSACTION_FINISHED, - "Transaction Finished", false }, { - TRANSACTION_FINISHED_BUT_DESTINATION_FULL, - "Transaction Finished But Destination is Full", false }, { - CANCEL_TRANSACTION, "Cancel Transaction", true }, { BAD_CHECKSUM, - "Bad Checksum", false }, { MORE_DATA, "More Data Exists", false }, { - NO_MORE_DATA, "No More Data Exists", false }, { UNKNOWN_PORT, - "Unknown Port", false }, { PORT_NOT_IN_VALID_STATE, - "Port Not in a Valid State", true }, { PORTS_DESTINATION_FULL, - "Port's Destination is Full", false }, { UNAUTHORIZED, - "User Not Authorized", true }, { ABORT, "Abort", true }, { - UNRECOGNIZED_RESPONSE_CODE, "Unrecognized Response Code", false }, { - END_OF_STREAM, "End of Stream", false } }; - -//! Respond Code Sequence Pattern + "Reserved for Future Use", false }, + { PROPERTIES_OK, "Properties OK", false }, { UNKNOWN_PROPERTY_NAME, + "Unknown Property Name", true }, { ILLEGAL_PROPERTY_VALUE, + "Illegal Property Value", true }, { MISSING_PROPERTY, + "Missing Property", true }, { CONTINUE_TRANSACTION, + "Continue Transaction", false }, { FINISH_TRANSACTION, + "Finish Transaction", false }, { CONFIRM_TRANSACTION, + "Confirm Transaction", true }, { TRANSACTION_FINISHED, + "Transaction Finished", false }, { + TRANSACTION_FINISHED_BUT_DESTINATION_FULL, + "Transaction Finished But Destination is Full", false }, { + CANCEL_TRANSACTION, "Cancel Transaction", true }, { BAD_CHECKSUM, + "Bad Checksum", false }, { MORE_DATA, "More Data Exists", false }, { + NO_MORE_DATA, "No More Data Exists", false }, { UNKNOWN_PORT, + "Unknown Port", false }, { PORT_NOT_IN_VALID_STATE, + "Port Not in a Valid State", true }, { PORTS_DESTINATION_FULL, + "Port's Destination is Full", false }, { UNAUTHORIZED, + "User Not Authorized", true }, { ABORT, "Abort", true }, { + UNRECOGNIZED_RESPONSE_CODE, "Unrecognized Response Code", false }, { + END_OF_STREAM, "End of Stream", false } }; + +// Respond Code Sequence Pattern static const uint8_t CODE_SEQUENCE_VALUE_1 = (uint8_t) 'R'; static const uint8_t CODE_SEQUENCE_VALUE_2 = (uint8_t) 'C'; @@ -200,43 +208,44 @@ static const uint8_t CODE_SEQUENCE_VALUE_2 = (uint8_t) 'C'; * Protocol. */ typedef enum { - /** - * Boolean value indicating whether or not the contents of a FlowFile should - * be GZipped when transferred. - */ - GZIP, - /** - * The unique identifier of the port to communicate with - */ - PORT_IDENTIFIER, - /** - * Indicates the number of milliseconds after the request was made that the - * client will wait for a response. If no response has been received by the - * time this value expires, the server can move on without attempting to - * service the request because the client will have already disconnected. - */ - REQUEST_EXPIRATION_MILLIS, - /** - * The preferred number of FlowFiles that the server should send to the - * client when pulling data. This property was introduced in version 5 of - * the protocol. - */ - BATCH_COUNT, - /** - * The preferred number of bytes that the server should send to the client - * when pulling data. This property was introduced in version 5 of the - * protocol. - */ - BATCH_SIZE, - /** - * The preferred amount of time that the server should send data to the - * client when pulling data. This property was introduced in version 5 of - * the protocol. Value is in milliseconds. - */ - BATCH_DURATION, MAX_HANDSHAKE_PROPERTY + /** + * Boolean value indicating whether or not the contents of a FlowFile should + * be GZipped when transferred. + */ + GZIP, + /** + * The unique identifier of the port to communicate with + */ + PORT_IDENTIFIER, + /** + * Indicates the number of milliseconds after the request was made that the + * client will wait for a response. If no response has been received by the + * time this value expires, the server can move on without attempting to + * service the request because the client will have already disconnected. + */ + REQUEST_EXPIRATION_MILLIS, + /** + * The preferred number of FlowFiles that the server should send to the + * client when pulling data. This property was introduced in version 5 of + * the protocol. + */ + BATCH_COUNT, + /** + * The preferred number of bytes that the server should send to the client + * when pulling data. This property was introduced in version 5 of the + * protocol. + */ + BATCH_SIZE, + /** + * The preferred amount of time that the server should send data to the + * client when pulling data. This property was introduced in version 5 of + * the protocol. Value is in milliseconds. + */ + BATCH_DURATION, + MAX_HANDSHAKE_PROPERTY } HandshakeProperty; -//! HandShakeProperty Str +// HandShakeProperty Str static const char *HandShakePropertyStr[MAX_HANDSHAKE_PROPERTY] = { /** * Boolean value indicating whether or not the contents of a FlowFile should @@ -275,89 +284,89 @@ static const char *HandShakePropertyStr[MAX_HANDSHAKE_PROPERTY] = { class Site2SiteClientProtocol; -//! Transaction Class +// Transaction Class class Transaction { - friend class Site2SiteClientProtocol; -public: - //! Constructor - /*! - * Create a new transaction - */ - explicit Transaction(TransferDirection direction, - CRCStream<Site2SitePeer> &stream) : - crcStream(std::move(stream)) { - _state = TRANSACTION_STARTED; - _direction = direction; - _dataAvailable = false; - _transfers = 0; - _bytes = 0; - - char uuidStr[37]; - - // Generate the global UUID for the transaction - uuid_generate(_uuid); - uuid_unparse_lower(_uuid, uuidStr); - _uuidStr = uuidStr; - } - //! Destructor - virtual ~Transaction() { - } - //! getUUIDStr - std::string getUUIDStr() { - return _uuidStr; - } - //! getState - TransactionState getState() { - return _state; - } - //! isDataAvailable - bool isDataAvailable() { - return _dataAvailable; - } - //! setDataAvailable() - void setDataAvailable(bool value) { - _dataAvailable = value; - } - //! getDirection - TransferDirection getDirection() { - return _direction; - } - //! getCRC - long getCRC() { - return crcStream.getCRC(); - } - //! updateCRC - void updateCRC(uint8_t *buffer, uint32_t length) { - crcStream.updateCRC(buffer, length); - } - - CRCStream<Site2SitePeer> &getStream() { - return crcStream; - } - - - Transaction(const Transaction &parent) = delete; - Transaction &operator=(const Transaction &parent) = delete; - -protected: - -private: - - CRCStream<Site2SitePeer> crcStream; - //! Transaction State - TransactionState _state; - //! Transaction Direction - TransferDirection _direction; - //! Whether received data is available - bool _dataAvailable; - //! A global unique identifier - uuid_t _uuid; - //! UUID string - std::string _uuidStr; - //! Number of transfer - int _transfers; - //! Number of content bytes - uint64_t _bytes; + friend class Site2SiteClientProtocol; + public: + // Constructor + /*! + * Create a new transaction + */ + explicit Transaction( + TransferDirection direction, + org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> &stream) + : crcStream(std::move(stream)) { + _state = TRANSACTION_STARTED; + _direction = direction; + _dataAvailable = false; + _transfers = 0; + _bytes = 0; + + char uuidStr[37]; + + // Generate the global UUID for the transaction + uuid_generate(_uuid); + uuid_unparse_lower(_uuid, uuidStr); + _uuidStr = uuidStr; + } + // Destructor + virtual ~Transaction() { + } + // getUUIDStr + std::string getUUIDStr() { + return _uuidStr; + } + // getState + TransactionState getState() { + return _state; + } + // isDataAvailable + bool isDataAvailable() { + return _dataAvailable; + } + // setDataAvailable() + void setDataAvailable(bool value) { + _dataAvailable = value; + } + // getDirection + TransferDirection getDirection() { + return _direction; + } + // getCRC + long getCRC() { + return crcStream.getCRC(); + } + // updateCRC + void updateCRC(uint8_t *buffer, uint32_t length) { + crcStream.updateCRC(buffer, length); + } + + org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> &getStream() { + return crcStream; + } + + Transaction(const Transaction &parent) = delete; + Transaction &operator=(const Transaction &parent) = delete; + + protected: + + private: + + org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> crcStream; + // Transaction State + TransactionState _state; + // Transaction Direction + TransferDirection _direction; + // Whether received data is available + bool _dataAvailable; + // A global unique identifier + uuid_t _uuid; + // UUID string + std::string _uuidStr; + // Number of transfer + int _transfers; + // Number of content bytes + uint64_t _bytes; }; @@ -366,264 +375,268 @@ private: * NiFi instance. */ class DataPacket { -public: - DataPacket(Site2SiteClientProtocol *protocol, Transaction *transaction, - std::map<std::string, std::string> attributes) { - _protocol = protocol; - _size = 0; - _transaction = transaction; - _attributes = attributes; - } - std::map<std::string, std::string> _attributes; - uint64_t _size; - Site2SiteClientProtocol *_protocol; - Transaction *_transaction; + public: + DataPacket(Site2SiteClientProtocol *protocol, Transaction *transaction, + std::map<std::string, std::string> attributes) { + _protocol = protocol; + _size = 0; + _transaction = transaction; + _attributes = attributes; + } + std::map<std::string, std::string> _attributes; + uint64_t _size; + Site2SiteClientProtocol *_protocol; + Transaction *_transaction; }; -//! Site2SiteClientProtocol Class +// Site2SiteClientProtocol Class class Site2SiteClientProtocol { -public: - //! Constructor - /*! - * Create a new control protocol - */ - Site2SiteClientProtocol(Site2SitePeer *peer) { - logger_ = Logger::getLogger(); - configure_ = Configure::getConfigure(); - peer_ = peer; - _batchSize = 0; - _batchCount = 0; - _batchDuration = 0; - _batchSendNanos = 5000000000; // 5 seconds - _timeOut = 30000; // 30 seconds - _peerState = IDLE; - _supportedVersion[0] = 5; - _supportedVersion[1] = 4; - _supportedVersion[2] = 3; - _supportedVersion[3] = 2; - _supportedVersion[4] = 1; - _currentVersion = _supportedVersion[0]; - _currentVersionIndex = 0; - _supportedCodecVersion[0] = 1; - _currentCodecVersion = _supportedCodecVersion[0]; - _currentCodecVersionIndex = 0; - } - //! Destructor - virtual ~Site2SiteClientProtocol() { - } - -public: - //! setBatchSize - void setBatchSize(uint64_t size) { - _batchSize = size; - } - //! setBatchCount - void setBatchCount(uint64_t count) { - _batchCount = count; - } - //! setBatchDuration - void setBatchDuration(uint64_t duration) { - _batchDuration = duration; - } - //! setTimeOut - void setTimeOut(uint64_t time) { - _timeOut = time; - if (peer_) - peer_->setTimeOut(time); - - } - - void setPeer(Site2SitePeer *peer) - { - peer_ = peer; - } - /** - * Provides a reference to the time out - * @returns timeout - */ - const uint64_t getTimeOut() const { - return _timeOut; - } - - /** - * Provides a reference to the port identifier - * @returns port identifier - */ - const std::string getPortId() const { - return _portIdStr; - } - //! setPortId - void setPortId(uuid_t id) { - uuid_copy(_portId, id); - char idStr[37]; - uuid_unparse_lower(id, idStr); - _portIdStr = idStr; - } - //! getResourceName - std::string getResourceName() { - return "SocketFlowFileProtocol"; - } - //! getCodecResourceName - std::string getCodecResourceName() { - return "StandardFlowFileCodec"; - } - //! bootstrap the protocol to the ready for transaction state by going through the state machine - bool bootstrap(); - //! establish - bool establish(); - //! handShake - bool handShake(); - //! negotiateCodec - bool negotiateCodec(); - //! initiateResourceNegotiation - bool initiateResourceNegotiation(); - //! initiateCodecResourceNegotiation - bool initiateCodecResourceNegotiation(); - //! tearDown - void tearDown(); - //! write Request Type - int writeRequestType(RequestType type); - //! read Request Type - int readRequestType(RequestType &type); - //! read Respond - int readRespond(RespondCode &code, std::string &message); - //! write respond - int writeRespond(RespondCode code, std::string message); - //! getRespondCodeContext - RespondCodeContext *getRespondCodeContext(RespondCode code) { - for (unsigned int i = 0; - i < sizeof(respondCodeContext) / sizeof(RespondCodeContext); - i++) { - if (respondCodeContext[i].code == code) { - return &respondCodeContext[i]; - } - } - return NULL; - } - //! getPeer - Site2SitePeer *getPeer() { - return peer_; - } - //! Creation of a new transaction, return the transaction ID if success, - //! Return NULL when any error occurs - Transaction *createTransaction(std::string &transactionID, - TransferDirection direction); - //! Receive the data packet from the transaction - //! Return false when any error occurs - bool receive(std::string transactionID, DataPacket *packet, bool &eof); - //! Send the data packet from the transaction - //! Return false when any error occurs - bool send(std::string transactionID, DataPacket *packet, - FlowFileRecord *flowFile, ProcessSession *session); - //! Confirm the data that was sent or received by comparing CRC32's of the data sent and the data received. - bool confirm(std::string transactionID); - //! Cancel the transaction - void cancel(std::string transactionID); - //! Complete the transaction - bool complete(std::string transactionID); - //! Error the transaction - void error(std::string transactionID); - //! Receive flow files for the process session - void receiveFlowFiles(ProcessContext *context, ProcessSession *session); - //! Transfer flow files for the process session - void transferFlowFiles(ProcessContext *context, ProcessSession *session); - //! deleteTransaction - void deleteTransaction(std::string transactionID); - //! Nest Callback Class for write stream - class WriteCallback: public OutputStreamCallback { - public: - WriteCallback(DataPacket *packet) : - _packet(packet) { - } - DataPacket *_packet; - void process(std::ofstream *stream) { - uint8_t buffer[8192]; - int len = _packet->_size; - while (len > 0) { - int size = std::min(len, (int) sizeof(buffer)); - int ret = _packet->_transaction->getStream().readData(buffer,size); - if (ret != size) { - _packet->_protocol->logger_->log_error( - "Site2Site Receive Flow Size %d Failed %d", size, - ret); - break; - } - stream->write((const char *) buffer, size); - len -= size; - } - } - }; - //! Nest Callback Class for read stream - class ReadCallback: public InputStreamCallback { - public: - ReadCallback(DataPacket *packet) : - _packet(packet) { - } - DataPacket *_packet; - void process(std::ifstream *stream) { - _packet->_size = 0; - uint8_t buffer[8192]; - int readSize; - while (stream->good()) { - if (!stream->read((char *) buffer, 8192)) - readSize = stream->gcount(); - else - readSize = 8192; - int ret = _packet->_transaction->getStream().writeData(buffer,readSize); - if (ret != readSize) { - _packet->_protocol->logger_->log_error( - "Site2Site Send Flow Size %d Failed %d", readSize, - ret); - break; - } - _packet->_size += readSize; - } - } - }; - -protected: - -private: - - //! Mutex for protection - std::mutex _mtx; - //! Logger - std::shared_ptr<Logger> logger_; - //! Configure - Configure *configure_; - //! Batch Count - std::atomic<uint64_t> _batchCount; - //! Batch Size - std::atomic<uint64_t> _batchSize; - //! Batch Duration in msec - std::atomic<uint64_t> _batchDuration; - //! Timeout in msec - std::atomic<uint64_t> _timeOut; - //! Peer Connection - Site2SitePeer *peer_; - //! portId - uuid_t _portId; - //! portIDStr - std::string _portIdStr; - //! BATCH_SEND_NANOS - uint64_t _batchSendNanos; - //! Peer State - PeerState _peerState; - uint32_t _supportedVersion[5]; - uint32_t _currentVersion; - int _currentVersionIndex; - uint32_t _supportedCodecVersion[1]; - uint32_t _currentCodecVersion; - int _currentCodecVersionIndex; - //! commsIdentifier - std::string _commsIdentifier; - //! transaction map - std::map<std::string, Transaction *> _transactionMap; - - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - Site2SiteClientProtocol(const Site2SiteClientProtocol &parent); - Site2SiteClientProtocol &operator=(const Site2SiteClientProtocol &parent); + public: + // Constructor + /*! + * Create a new control protocol + */ + Site2SiteClientProtocol(std::unique_ptr<Site2SitePeer> peer) { + logger_ = logging::Logger::getLogger(); + configure_ = Configure::getConfigure(); + peer_ = std::move(peer); + _batchSize = 0; + _batchCount = 0; + _batchDuration = 0; + _batchSendNanos = 5000000000; // 5 seconds + _timeOut = 30000; // 30 seconds + _peerState = IDLE; + _supportedVersion[0] = 5; + _supportedVersion[1] = 4; + _supportedVersion[2] = 3; + _supportedVersion[3] = 2; + _supportedVersion[4] = 1; + _currentVersion = _supportedVersion[0]; + _currentVersionIndex = 0; + _supportedCodecVersion[0] = 1; + _currentCodecVersion = _supportedCodecVersion[0]; + _currentCodecVersionIndex = 0; + } + // Destructor + virtual ~Site2SiteClientProtocol() { + tearDown(); + } + + public: + // setBatchSize + void setBatchSize(uint64_t size) { + _batchSize = size; + } + // setBatchCount + void setBatchCount(uint64_t count) { + _batchCount = count; + } + // setBatchDuration + void setBatchDuration(uint64_t duration) { + _batchDuration = duration; + } + // setTimeOut + void setTimeOut(uint64_t time) { + _timeOut = time; + if (peer_) + peer_->setTimeOut(time); + + } + + void setPeer(std::unique_ptr<Site2SitePeer> peer) { + peer_ = std::move(peer); + } + /** + * Provides a reference to the time out + * @returns timeout + */ + const uint64_t getTimeOut() const { + return _timeOut; + } + + /** + * Provides a reference to the port identifier + * @returns port identifier + */ + const std::string getPortId() const { + return _portIdStr; + } + // setPortId + void setPortId(uuid_t id) { + uuid_copy(_portId, id); + char idStr[37]; + uuid_unparse_lower(id, idStr); + _portIdStr = idStr; + } + // getResourceName + std::string getResourceName() { + return "SocketFlowFileProtocol"; + } + // getCodecResourceName + std::string getCodecResourceName() { + return "StandardFlowFileCodec"; + } + // bootstrap the protocol to the ready for transaction state by going through the state machine + bool bootstrap(); + // establish + bool establish(); + // handShake + bool handShake(); + // negotiateCodec + bool negotiateCodec(); + // initiateResourceNegotiation + bool initiateResourceNegotiation(); + // initiateCodecResourceNegotiation + bool initiateCodecResourceNegotiation(); + // tearDown + void tearDown(); + // write Request Type + int writeRequestType(RequestType type); + // read Request Type + int readRequestType(RequestType &type); + // read Respond + int readRespond(RespondCode &code, std::string &message); + // write respond + int writeRespond(RespondCode code, std::string message); + // getRespondCodeContext + RespondCodeContext *getRespondCodeContext(RespondCode code) { + for (unsigned int i = 0; + i < sizeof(respondCodeContext) / sizeof(RespondCodeContext); i++) { + if (respondCodeContext[i].code == code) { + return &respondCodeContext[i]; + } + } + return NULL; + } + + // Creation of a new transaction, return the transaction ID if success, + // Return NULL when any error occurs + Transaction *createTransaction(std::string &transactionID, + TransferDirection direction); + // Receive the data packet from the transaction + // Return false when any error occurs + bool receive(std::string transactionID, DataPacket *packet, bool &eof); + // Send the data packet from the transaction + // Return false when any error occurs + bool send(std::string transactionID, DataPacket *packet, + std::shared_ptr<FlowFileRecord> flowFile, + core::ProcessSession *session); + // Confirm the data that was sent or received by comparing CRC32's of the data sent and the data received. + bool confirm(std::string transactionID); + // Cancel the transaction + void cancel(std::string transactionID); + // Complete the transaction + bool complete(std::string transactionID); + // Error the transaction + void error(std::string transactionID); + // Receive flow files for the process session + void receiveFlowFiles( + core::ProcessContext *context, + core::ProcessSession *session); + // Transfer flow files for the process session + void transferFlowFiles( + core::ProcessContext *context, + core::ProcessSession *session); + // deleteTransaction + void deleteTransaction(std::string transactionID); + // Nest Callback Class for write stream + class WriteCallback : public OutputStreamCallback { + public: + WriteCallback(DataPacket *packet) + : _packet(packet) { + } + DataPacket *_packet; + void process(std::ofstream *stream) { + uint8_t buffer[8192]; + int len = _packet->_size; + while (len > 0) { + int size = std::min(len, (int) sizeof(buffer)); + int ret = _packet->_transaction->getStream().readData(buffer, size); + if (ret != size) { + _packet->_protocol->logger_->log_error( + "Site2Site Receive Flow Size %d Failed %d", size, ret); + break; + } + stream->write((const char *) buffer, size); + len -= size; + } + } + }; + // Nest Callback Class for read stream + class ReadCallback : public InputStreamCallback { + public: + ReadCallback(DataPacket *packet) + : _packet(packet) { + } + DataPacket *_packet; + void process(std::ifstream *stream) { + _packet->_size = 0; + uint8_t buffer[8192]; + int readSize; + while (stream->good()) { + if (!stream->read((char *) buffer, 8192)) + readSize = stream->gcount(); + else + readSize = 8192; + int ret = _packet->_transaction->getStream().writeData(buffer, + readSize); + if (ret != readSize) { + _packet->_protocol->logger_->log_error( + "Site2Site Send Flow Size %d Failed %d", readSize, ret); + break; + } + _packet->_size += readSize; + } + } + }; + + protected: + + private: + + // Mutex for protection + std::mutex mutex_; + // Logger + std::shared_ptr<logging::Logger> logger_; + // Configure + Configure *configure_; + // Batch Count + std::atomic<uint64_t> _batchCount; + // Batch Size + std::atomic<uint64_t> _batchSize; + // Batch Duration in msec + std::atomic<uint64_t> _batchDuration; + // Timeout in msec + std::atomic<uint64_t> _timeOut; + // Peer Connection + std::unique_ptr<Site2SitePeer> peer_; + // portId + uuid_t _portId; + // portIDStr + std::string _portIdStr; + // BATCH_SEND_NANOS + uint64_t _batchSendNanos; + // Peer State + PeerState _peerState; + uint32_t _supportedVersion[5]; + uint32_t _currentVersion; + int _currentVersionIndex; + uint32_t _supportedCodecVersion[1]; + uint32_t _currentCodecVersion; + int _currentCodecVersionIndex; + // commsIdentifier + std::string _commsIdentifier; + // transaction map + std::map<std::string, Transaction *> _transactionMap; + + // Prevent default copy constructor and assignment operation + // Only support pass by reference or pointer + Site2SiteClientProtocol(const Site2SiteClientProtocol &parent); + Site2SiteClientProtocol &operator=(const Site2SiteClientProtocol &parent); }; +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ #endif
