[
https://issues.apache.org/jira/browse/MINIFI-217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903790#comment-15903790
]
ASF GitHub Bot commented on MINIFI-217:
---------------------------------------
Github user jdye64 commented on a diff in the pull request:
https://github.com/apache/nifi-minifi-cpp/pull/63#discussion_r105259321
--- Diff: libminifi/include/Connection.h ---
@@ -28,174 +28,168 @@
#include <atomic>
#include <algorithm>
-#include "FlowFileRecord.h"
-#include "Logger.h"
-#include "Relationship.h"
+#include "core/Connectable.h"
+#include "core/Record.h"
+#include "core/logging/Logger.h"
+#include "core/Relationship.h"
+#include "core/Connectable.h"
-//! Forwarder declaration
-class Processor;
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
//! Connection Class
-class Connection
-{
-public:
- //! Constructor
- /*!
- * Create a new processor
- */
- explicit Connection(std::string name, uuid_t uuid = NULL, uuid_t
srcUUID = NULL, uuid_t destUUID = NULL);
- //! Destructor
- virtual ~Connection() {}
- //! Set Connection Name
- void setName(std::string name) {
- _name = name;
- }
- //! Get Process Name
- std::string getName(void) {
- return (_name);
- }
- //! Set UUID
- void setUUID(uuid_t uuid) {
- uuid_copy(_uuid, uuid);
- }
- //! Set Source Processor UUID
- void setSourceProcessorUUID(uuid_t uuid) {
- uuid_copy(_srcUUID, uuid);
- }
- //! Set Destination Processor UUID
- void setDestinationProcessorUUID(uuid_t uuid) {
- uuid_copy(_destUUID, uuid);
- }
- //! Get Source Processor UUID
- void getSourceProcessorUUID(uuid_t uuid) {
- uuid_copy(uuid, _srcUUID);
- }
- //! Get Destination Processor UUID
- void getDestinationProcessorUUID(uuid_t uuid) {
- uuid_copy(uuid, _destUUID);
- }
- //! Get UUID
- bool getUUID(uuid_t uuid) {
- if (uuid)
- {
- uuid_copy(uuid, _uuid);
- return true;
- }
- else
- return false;
- }
- //! Set Connection Source Processor
- void setSourceProcessor(Processor *source) {
- _srcProcessor = source;
- }
- // ! Get Connection Source Processor
- Processor *getSourceProcessor() {
- return _srcProcessor;
- }
- //! Set Connection Destination Processor
- void setDestinationProcessor(Processor *dest) {
- _destProcessor = dest;
- }
- // ! Get Connection Destination Processor
- Processor *getDestinationProcessor() {
- return _destProcessor;
- }
- //! Set Connection relationship
- void setRelationship(Relationship relationship) {
- _relationship = relationship;
- }
- // ! Get Connection relationship
- Relationship getRelationship() {
- return _relationship;
- }
- //! Set Max Queue Size
- void setMaxQueueSize(uint64_t size)
- {
- _maxQueueSize = size;
- }
- //! Get Max Queue Size
- uint64_t getMaxQueueSize()
- {
- return _maxQueueSize;
- }
- //! Set Max Queue Data Size
- void setMaxQueueDataSize(uint64_t size)
- {
- _maxQueueDataSize = size;
- }
- //! Get Max Queue Data Size
- uint64_t getMaxQueueDataSize()
- {
- return _maxQueueDataSize;
- }
- //! Set Flow expiration duration in millisecond
- void setFlowExpirationDuration(uint64_t duration)
- {
- _expiredDuration = duration;
- }
- //! Get Flow expiration duration in millisecond
- uint64_t getFlowExpirationDuration()
- {
- return _expiredDuration;
- }
- //! Check whether the queue is empty
- bool isEmpty();
- //! Check whether the queue is full to apply back pressure
- bool isFull();
- //! Get queue size
- uint64_t getQueueSize() {
- std::lock_guard<std::mutex> lock(_mtx);
- return _queue.size();
- }
- //! Get queue data size
- uint64_t getQueueDataSize()
- {
- return _maxQueueDataSize;
- }
- //! Put the flow file into queue
- void put(FlowFileRecord *flow);
- //! Poll the flow file from queue, the expired flow file record also
being returned
- FlowFileRecord *poll(std::set<FlowFileRecord *> &expiredFlowRecords);
- //! Drain the flow records
- void drain();
+class Connection : public core::Connectable,
+ public std::enable_shared_from_this<Connection> {
+ public:
+ //! Constructor
+ /*!
+ * Create a new processor
+ */
+ explicit Connection(std::string name, uuid_t uuid = NULL, uuid_t srcUUID
=
+ NULL,
+ uuid_t destUUID = NULL);
+ //! Destructor
+ virtual ~Connection() {
+ }
-protected:
- //! A global unique identifier
- uuid_t _uuid;
- //! Source Processor UUID
- uuid_t _srcUUID;
- //! Destination Processor UUID
- uuid_t _destUUID;
- //! Connection Name
- std::string _name;
- //! Relationship for this connection
- Relationship _relationship;
- //! Source Processor (ProcessNode/Port)
- Processor *_srcProcessor;
- //! Destination Processor (ProcessNode/Port)
- Processor *_destProcessor;
- //! Max queue size to apply back pressure
- std::atomic<uint64_t> _maxQueueSize;
- //! Max queue data size to apply back pressure
- std::atomic<uint64_t> _maxQueueDataSize;
- //! Flow File Expiration Duration in= MilliSeconds
- std::atomic<uint64_t> _expiredDuration;
+ //! Set Source Processor UUID
+ void setSourceUUID(uuid_t uuid) {
+ uuid_copy(_srcUUID, uuid);
+ }
+ //! Set Destination Processor UUID
+ void setDestinationUUID(uuid_t uuid) {
+ uuid_copy(_destUUID, uuid);
+ }
+ //! Get Source Processor UUID
+ void getSourceUUID(uuid_t uuid) {
+ uuid_copy(uuid, _srcUUID);
+ }
+ //! Get Destination Processor UUID
+ void getDestinationUUID(uuid_t uuid) {
+ uuid_copy(uuid, _destUUID);
+ }
+ //! Set Connection Source Processor
+ void setSource(
+ std::shared_ptr<core::Connectable> source) {
+ _srcProcessor = source;
+ }
+ // ! Get Connection Source Processor
+ std::shared_ptr<core::Connectable> getSource() {
+ return _srcProcessor;
+ }
+ //! Set Connection Destination Processor
+ void setDestination(
+ std::shared_ptr<core::Connectable> dest) {
+ _destProcessor = dest;
+ }
+ // ! Get Connection Destination Processor
+ std::shared_ptr<core::Connectable> getDestination() {
+ return _destProcessor;
+ }
+ //! Set Connection relationship
+ void setRelationship(
+ core::Relationship relationship) {
+ relationship_ = relationship;
+ }
+ // ! Get Connection relationship
+ core::Relationship getRelationship() {
+ return relationship_;
+ }
+ //! Set Max Queue Size
+ void setMaxQueueSize(uint64_t size) {
+ _maxQueueSize = size;
+ }
+ //! Get Max Queue Size
+ uint64_t getMaxQueueSize() {
+ return _maxQueueSize;
+ }
+ //! Set Max Queue Data Size
+ void setMaxQueueDataSize(uint64_t size) {
+ _maxQueueDataSize = size;
+ }
+ //! Get Max Queue Data Size
+ uint64_t getMaxQueueDataSize() {
+ return _maxQueueDataSize;
+ }
+ //! Set Flow expiration duration in millisecond
+ void setFlowExpirationDuration(uint64_t duration) {
+ _expiredDuration = duration;
+ }
+ //! Get Flow expiration duration in millisecond
+ uint64_t getFlowExpirationDuration() {
+ return _expiredDuration;
+ }
+ //! Check whether the queue is empty
+ bool isEmpty();
+ //! Check whether the queue is full to apply back pressure
+ bool isFull();
+ //! Get queue size
+ uint64_t getQueueSize() {
+ std::lock_guard<std::mutex> lock(_mtx);
+ return queue_.size();
+ }
+ //! Get queue data size
+ uint64_t getQueueDataSize() {
+ return _maxQueueDataSize;
+ }
+ //! Put the flow file into queue
+ void put(std::shared_ptr<core::Record> flow);
+ //! Poll the flow file from queue, the expired flow file record also
being returned
+ std::shared_ptr<core::Record> poll(
+ std::set<std::shared_ptr<core::Record>> &expiredFlowRecords);
+ //! Drain the flow records
+ void drain();
-private:
- //! Mutex for protection
- std::mutex _mtx;
- //! Queued data size
- std::atomic<uint64_t> _queuedDataSize;
- //! Queue for the Flow File
- std::queue<FlowFileRecord *> _queue;
- //! Logger
- std::shared_ptr<Logger> logger_;
- // Prevent default copy constructor and assignment operation
- // Only support pass by reference or pointer
- Connection(const Connection &parent);
- Connection &operator=(const Connection &parent);
+ void yield() {
-};
+ }
+
+ bool isWorkAvailable() {
+ return !isEmpty();
+ }
+
+ bool isRunning() {
+ return true;
+ }
+ protected:
+ //! Source Processor UUID
+ uuid_t _srcUUID;
--- End diff --
Can you move "_" to end of class data member to adhere to Google C++ Style
guid?
> Move to org::apache::nifi::minifi namespace in CPP agent
> --------------------------------------------------------
>
> Key: MINIFI-217
> URL: https://issues.apache.org/jira/browse/MINIFI-217
> Project: Apache NiFi MiNiFi
> Issue Type: Bug
> Components: C++
> Affects Versions: cpp-0.1.0, cpp-0.2.0
> Reporter: marco polo
> Priority: Trivial
> Original Estimate: 504h
> Remaining Estimate: 504h
>
> Move code to a more controlled namespace. I suggest org::apache::nifi::minifi
> since that more closely reflects the JAVA package.
> Suggest Processor namespace be
> org::apache::nifi::minifi::processors
> Suggest I/O namespace be
> org::apache::nifi::minifi::io
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)