lordgamez commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1043352711


##########
extensions/mqtt/processors/PublishMQTT.h:
##########
@@ -62,72 +68,116 @@ class PublishMQTT : public 
processors::AbstractMQTTProcessor {
 
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 
-  class ReadCallback {
+  void readProperties(const std::shared_ptr<core::ProcessContext>& context) 
override;
+  void onTriggerImpl(const std::shared_ptr<core::ProcessContext>& context, 
const std::shared_ptr<core::ProcessSession>& session) override;
+  void initialize() override;
+
+ private:
+  /**
+   * Counts unacknowledged QoS 1 and QoS 2 messages to respect broker's 
Receive Maximum
+   */
+  class InFlightMessageCounter {
    public:
-    ReadCallback(PublishMQTT* processor, uint64_t flow_size, uint64_t 
max_seg_size, std::string topic, MQTTAsync client, int qos, bool retain)
-        : processor_(processor),
-          flow_size_(flow_size),
-          max_seg_size_(max_seg_size),
-          topic_(std::move(topic)),
-          client_(client),
-          qos_(qos),
-          retain_(retain) {
+    void setMqttVersion(const MqttVersions mqtt_version) {
+      mqtt_version_ = mqtt_version;
+    }
+
+    void setQoS(const MqttQoS qos) {
+      qos_ = qos;
+    }
+
+    void setMax(const uint16_t new_limit) {
+      limit_ = new_limit;
     }
 
-    int64_t operator()(const std::shared_ptr<io::InputStream>& stream);
+    // increase on sending, wait if limit is reached
+    void increase();
 
-    size_t read_size_ = 0;
-    int status_ = 0;
+    // decrease on success or failure, notify
+    void decrease();
 
    private:
-    PublishMQTT* processor_;
-    uint64_t flow_size_;
-    uint64_t max_seg_size_;
-    std::string topic_;
-    MQTTAsync client_;
-
-    int qos_;
-    bool retain_;
+    std::mutex mutex_;
+    std::condition_variable cv_;
+    uint16_t counter_{0};
+    uint16_t limit_{MQTT_MAX_RECEIVE_MAXIMUM};
+    MqttVersions mqtt_version_;
+    MqttQoS qos_;
   };
 
-  void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSessionFactory> &factory) override;
-  void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSession> &session) override;
-  void initialize() override;
+  // MQTT static async callbacks, calling their notify with context being 
pointer to a packaged_task to notify()
+  static void sendSuccess(void* context, MQTTAsync_successData* response);
+  static void sendSuccess5(void* context, MQTTAsync_successData5* response);
+  static void sendFailure(void* context, MQTTAsync_failureData* response);
+  static void sendFailure5(void* context, MQTTAsync_failureData5* response);
+
+  /**
+   * Resolves topic from expression language
+   */
+  std::string getTopic(const std::shared_ptr<core::ProcessContext>& context, 
const std::shared_ptr<core::FlowFile>& flow_file) const;
+
+  /**
+   * Resolves content type from expression language
+   */
+  std::string getContentType(const std::shared_ptr<core::ProcessContext>& 
context, const std::shared_ptr<core::FlowFile>& flow_file) const;
+
+  /**
+   * Sends an MQTT message asynchronously
+   * @param buffer contents of the message
+   * @param topic topic of the message
+   * @param content_type Content Type for MQTT 5
+   * @param flow_file Flow File being processed
+   * @return success of message sending
+   */
+  bool sendMessage(const std::vector<std::byte>& buffer, const std::string& 
topic, const std::string& content_type, const std::shared_ptr<core::FlowFile>& 
flow_file);
+
+  /**
+   * Callback for asynchronous message sending
+   * @param success if message sending was successful
+   * @param response_code response code for failure only
+   * @param reason_code MQTT 5 reason code
+   * @return if message sending was successful
+   */
+  bool notify(bool success, std::optional<int> response_code, 
std::optional<MQTTReasonCodes> reason_code);
+
+  /**
+   * Set MQTT 5-exclusive properties
+   * @param message message object
+   * @param content_type content type
+   * @param flow_file Flow File being processed
+   */
+  void setMqtt5Properties(MQTTAsync_message& message, const std::string& 
content_type, const std::shared_ptr<core::FlowFile>& flow_file) const;
+
+  /**
+   * Adds flow file attributes as user properties to an MQTT 5 message
+   * @param message message object
+   * @param flow_file Flow File being processed
+   */
+  static void addAttributesAsUserProperties(MQTTAsync_message& message, const 
std::shared_ptr<core::FlowFile>& flow_file);
 
- private:
-  // MQTT async callback
-  static void sendSuccess(void* context, MQTTAsync_successData* response) {
-    auto* processor = reinterpret_cast<PublishMQTT*>(context);
-    processor->onSendSuccess(response);
-  }
-
-  // MQTT async callback
-  static void sendFailure(void* context, MQTTAsync_failureData* response) {
-    auto* processor = reinterpret_cast<PublishMQTT*>(context);
-    processor->onSendFailure(response);
+  bool getCleanSession() const override {
+    return true;
   }
 
-  void onSendSuccess(MQTTAsync_successData* /*response*/) {
-    logger_->log_debug("Successfully sent message to MQTT topic %s on broker 
%s", topic_, uri_);
+  bool getCleanStart() const override {
+    return true;
   }
 
-  void onSendFailure(MQTTAsync_failureData* response) {
-    logger_->log_error("Sending message failed on topic %s to MQTT broker %s 
(%d)", topic_, uri_, response->code);
-    if (response->message != nullptr) {
-      logger_->log_error("Detailed reason for sending failure: %s", 
response->message);
-    }
+  std::chrono::seconds getSessionExpiryInterval() const override {
+    // non-persistent session as we only publish
+    return std::chrono::seconds{0};
   }
 
-  bool getCleanSession() const override {
-    return true;
+  void startupClient() override {
+    // there is no need to do anything like subscribe in the beginning
   }
 
-  bool startupClient() override {
-    // there is no need to do anything like subscribe on the beginning
-    return true;
-  }
+  void checkProperties() override;
+  void checkBrokerLimitsImpl() override;
 
   bool retain_ = false;
+  std::optional<std::chrono::seconds> message_expiry_interval_;
+  InFlightMessageCounter in_flight_message_counter_;

Review Comment:
   We can define processor specific metrics additionally to the general 
processor metrics we have for all processors like transferred flow file count, 
onTrigger calls etc. For example `GetFileMetrics` extends these metrics with 2 
additional ones and maybe `in flight message count` could also be added for 
`PublishMQTTMetrics`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to