lordgamez commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1031637892


##########
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##########
@@ -137,19 +148,45 @@ void AbstractMQTTProcessor::onSchedule(const 
std::shared_ptr<core::ProcessContex
 
 void AbstractMQTTProcessor::reconnect() {
   if (!client_) {
-    logger_->log_error("MQTT client is not existing while trying to 
reconnect");
-    return;
+    throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "MQTT 
client is not existing while trying to reconnect");
   }
   if (MQTTAsync_isConnected(client_)) {
-    logger_->log_info("Already connected to %s, no need to reconnect", uri_);
+    logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
     return;
   }
-  MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
+
+  MQTTAsync_connectOptions conn_opts;
+  MQTTProperties connect_props = MQTTProperties_initializer;
+  MQTTProperties will_props = MQTTProperties_initializer;
+
+  if (mqtt_version_.value() == MqttVersions::V_5_0) {
+    conn_opts = MQTTAsync_connectOptions_initializer5;
+    conn_opts.onSuccess5 = connectionSuccess5;
+    conn_opts.onFailure5 = connectionFailure5;
+    conn_opts.connectProperties = &connect_props;
+  } else {
+    conn_opts = MQTTAsync_connectOptions_initializer;
+    conn_opts.onSuccess = connectionSuccess;
+    conn_opts.onFailure = connectionFailure;
+  }
+
+  if (mqtt_version_.value() == MqttVersions::V_3_1_0) {
+    conn_opts.MQTTVersion = MQTTVERSION_3_1;
+  } else if (mqtt_version_.value() == MqttVersions::V_3_1_1) {
+    conn_opts.MQTTVersion = MQTTVERSION_3_1_1;
+  }
+
   conn_opts.keepAliveInterval = gsl::narrow<int>(keep_alive_interval_.count());
-  conn_opts.cleansession = getCleanSession();
-  conn_opts.context = this;
-  conn_opts.onSuccess = connectionSuccess;
-  conn_opts.onFailure = connectionFailure;
+  if (mqtt_version_.value() == MqttVersions::V_5_0) {
+    setMqtt5ConnectOptions(conn_opts, connect_props, will_props);

Review Comment:
   I think we could refactor this to have a `setConnectOptions` that calls the 
specific `setMqtt5ConnectOptions` and `setMqtt3ConnectOptions` functions 
depending on the version and move all `conn_opts` operations there. That would 
make reading the version specific option settings easier.



##########
PROCESSORS.md:
##########
@@ -337,28 +337,34 @@ This Processor gets the contents of a FlowFile from a 
MQTT broker for a specifie
 
 In the list below, the names of required properties appear in bold. Any other 
properties (not in bold) are considered optional. The table also indicates any 
default values, and whether a property supports the NiFi Expression Language.
 
-| Name                  | Default Value | Allowable Values | Description       
                                                                                
                          |
-|-----------------------|---------------|------------------|-----------------------------------------------------------------------------------------------------------------------------|
-| **Broker URI**        |               |                  | The URI to use to 
connect to the MQTT broker                                                      
                          |
-| **Topic**             |               |                  | The topic to 
subscribe to                                                                    
                               |
-| Client ID             |               |                  | MQTT client ID to 
use                                                                             
                          |
-| Quality of Service    | 0             |                  | The Quality of 
Service (QoS) to receive the message with. Accepts three values '0', '1' and 
'2'                             |
-| Connection Timeout    | 30 sec        |                  | Maximum time 
interval the client will wait for the network connection to the MQTT broker     
                               |
-| Keep Alive Interval   | 60 sec        |                  | Defines the 
maximum time interval between messages being sent to the broker                 
                                |
-| Max Flow Segment Size |               |                  | Maximum flow 
content payload segment size for the MQTT record                                
                               |
-| Last Will Topic       |               |                  | The topic to send 
the client's Last Will to. If the Last Will topic is not set then a Last Will 
will not be sent            |
-| Last Will Message     |               |                  | The message to 
send as the client's Last Will. If the Last Will Message is empty, Last Will 
will be deleted from the broker |
-| Last Will QoS         | 0             |                  | The Quality of 
Service (QoS) to send the last will with. Accepts three values '0', '1' and '2' 
                             |
-| Last Will Retain      | false         |                  | Whether to retain 
the client's Last Will                                                          
                          |
-| Security Protocol     |               |                  | Protocol used to 
communicate with brokers                                                        
                           |
-| Security CA           |               |                  | File or directory 
path to CA certificate(s) for verifying the broker's key                        
                          |
-| Security Cert         |               |                  | Path to client's 
public key (PEM) used for authentication                                        
                           |
-| Security Private Key  |               |                  | Path to client's 
private key (PEM) used for authentication                                       
                           |
-| Security Pass Phrase  |               |                  | Private key 
passphrase                                                                      
                                |
-| Username              |               |                  | Username to use 
when connecting to the broker                                                   
                            |
-| Password              |               |                  | Password to use 
when connecting to the broker                                                   
                            |
-| Clean Session         | true          |                  | Whether to start 
afresh rather than remembering previous subscriptions                           
                           |
-| Queue Max Message     | 1000          |                  | Maximum number of 
messages allowed on the received MQTT queue                                     
                          |
+| Name                        | Default Value | Allowable Values            | 
Description                                                                     
                                                                            |
+|-----------------------------|---------------|-----------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| **Broker URI**              |               |                             | 
The URI to use to connect to the MQTT broker                                    
                                                                            |
+| Client ID                   |               |                             | 
MQTT client ID to use. WARNING: Must not be empty when using MQTT 3.1.0!        
                                                                            |
+| MQTT Version                | 3.x AUTO      | 3.x AUTO, 3.1.0, 3.1.1, 5.0 | 
The MQTT specification version when connecting to the broker.                   
                                                                            |
+| **Topic**                   |               |                             | 
The topic to subscribe to.                                                      
                                                                            |
+| Clean Session               | true          |                             | 
Whether to start afresh rather than remembering previous subscriptions. Also 
make broker remember subscriptions after disconnected. WARNING: MQTT 3.x only. |
+| Clean Start                 | true          |                             | 
Whether to start afresh rather than remembering previous subscriptions. 
WARNING: MQTT 5.x only.                                                         
    |

Review Comment:
   Could these two properties be merged to one and used appropriately with each 
specific MQTT version?



##########
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##########
@@ -163,23 +200,259 @@ void AbstractMQTTProcessor::reconnect() {
   }
 
   logger_->log_info("Reconnecting to %s", uri_);
-  int ret = MQTTAsync_connect(client_, &conn_opts);
+  if (MQTTAsync_isConnected(client_)) {
+    logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
+    return;
+  }
+  const int ret = MQTTAsync_connect(client_, &conn_opts);
+  MQTTProperties_free(&connect_props);
   if (ret != MQTTASYNC_SUCCESS) {
-    logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, 
ret);
+    logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error 
code [%d]", uri_, ret);
+    return;
   }
+
+  // wait until connection succeeds or fails
+  connect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& 
conn_opts, MQTTProperties& connect_props, MQTTProperties& will_props) const {
+  conn_opts.cleanstart = getCleanStart();
+
+  {
+    MQTTProperty property;
+    property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
+    property.value.integer4 = 
gsl::narrow<int>(getSessionExpiryInterval().count());
+    MQTTProperties_add(&connect_props, &property);
+  }
+
+  if (!last_will_content_type_.empty()) {
+    MQTTProperty property;
+    property.identifier = MQTTPROPERTY_CODE_CONTENT_TYPE;
+    property.value.data.len = last_will_content_type_.length();
+    property.value.data.data = 
const_cast<char*>(last_will_content_type_.data());
+    MQTTProperties_add(&will_props, &property);
+  }
+
+  conn_opts.willProperties = &will_props;
+
+  setMqtt5ConnectOptionsImpl(connect_props);
+}
+
+void AbstractMQTTProcessor::onTrigger(const 
std::shared_ptr<core::ProcessContext>& context, const 
std::shared_ptr<core::ProcessSession>& session) {
+  // read lock

Review Comment:
   I don't think we can be sure that this is always a read lock as it depends 
on the implementations of the `onTriggerImpl` in its subclasses. We can remove 
the comment.



##########
extensions/mqtt/processors/ConsumeMQTT.cpp:
##########
@@ -34,111 +34,333 @@ void ConsumeMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr<MQTTAsync_message, 
MQTTMessageDeleter> message) {
-  if (queue_.size_approx() >= maxQueueSize_) {
-    logger_->log_warn("MQTT queue full");
+void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) {
+  if (queue_.size_approx() >= max_queue_size_) {
+    logger_->log_error("MQTT queue full");
     return;
   }
 
-  if (gsl::narrow<uint64_t>(message->payloadlen) > max_seg_size_) {
-    logger_->log_debug("MQTT message was truncated while enqueuing, original 
length: %d", message->payloadlen);
-    message->payloadlen = gsl::narrow<int>(max_seg_size_);
-  }
-
-  logger_->log_debug("enqueuing MQTT message with length %d", 
message->payloadlen);
+  logger_->log_debug("enqueuing MQTT message with length %d", 
message.contents->payloadlen);
   queue_.enqueue(std::move(message));
 }
 
-void ConsumeMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+void ConsumeMQTT::readProperties(const std::shared_ptr<core::ProcessContext>& 
context) {
+  if (auto value = context->getProperty(Topic)) {
+    topic_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_);
+
   if (const auto value = context->getProperty<bool>(CleanSession)) {
-    cleanSession_ = *value;
-    logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+    clean_session_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_);
+
+  if (const auto value = context->getProperty<bool>(CleanStart)) {
+    clean_start_ = *value;
+  }
+  logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_);
+
+  if (const auto session_expiry_interval = 
context->getProperty<core::TimePeriodValue>(SessionExpiryInterval)) {
+    session_expiry_interval_ = 
std::chrono::duration_cast<std::chrono::seconds>(session_expiry_interval->getMilliseconds());
+  }
+  logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s", 
int64_t{session_expiry_interval_.count()});
 
   if (const auto value = 
context->getProperty<uint64_t>(QueueBufferMaxMessage)) {
-    maxQueueSize_ = *value;
-    logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
maxQueueSize_);
+    max_queue_size_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
max_queue_size_);
 
-  // this connects to broker, so properties of this processor must be read 
before
-  AbstractMQTTProcessor::onSchedule(context, factory);
-}
+  if (auto value = context->getProperty(AttributeFromContentType)) {
+    attribute_from_content_type_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]", 
attribute_from_content_type_);
 
-void ConsumeMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>& 
/*context*/, const std::shared_ptr<core::ProcessSession> &session) {
-  // reconnect if needed
-  reconnect();
+  if (const auto topic_alias_maximum = 
context->getProperty<uint32_t>(TopicAliasMaximum)) {
+    topic_alias_maximum_ = gsl::narrow<uint16_t>(*topic_alias_maximum);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]", 
topic_alias_maximum_);
 
-  if (!MQTTAsync_isConnected(client_)) {
-    logger_->log_error("Could not consume from MQTT broker because 
disconnected to %s", uri_);
-    yield();
-    return;
+  if (const auto receive_maximum = 
context->getProperty<uint32_t>(ReceiveMaximum)) {
+    receive_maximum_ = gsl::narrow<uint16_t>(*receive_maximum);
   }
+  logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]", 
receive_maximum_);
+}
 
-  std::deque<std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter>> msg_queue;
-  getReceivedMQTTMsg(msg_queue);
+void ConsumeMQTT::onTriggerImpl(const std::shared_ptr<core::ProcessContext>& 
/*context*/, const std::shared_ptr<core::ProcessSession>& session) {
+  std::queue<SmartMessage> msg_queue = getReceivedMqttMessages();
   while (!msg_queue.empty()) {
     const auto& message = msg_queue.front();
-    std::shared_ptr<core::FlowFile> processFlowFile = session->create();
-    int write_status{};
-    session->write(processFlowFile, [&message, &write_status](const 
std::shared_ptr<io::OutputStream>& stream) -> int64_t {
-      if (message->payloadlen < 0) {
-        write_status = -1;
-        return -1;
-      }
-      const auto len = 
stream->write(reinterpret_cast<uint8_t*>(message->payload), 
gsl::narrow<size_t>(message->payloadlen));
-      if (io::isError(len)) {
-        write_status = -1;
-        return -1;
-      }
-      return gsl::narrow<int64_t>(len);
-    });
-    if (write_status < 0) {
-      logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", 
processFlowFile->getUUIDStr());
-      session->remove(processFlowFile);
+    std::shared_ptr<core::FlowFile> flow_file = session->create();
+    WriteCallback write_callback(message, logger_);
+    try {
+      session->write(flow_file, write_callback);
+    } catch (const Exception& ex) {
+      logger_->log_error("Error when processing message queue: %s", ex.what());
+    }
+    if (!write_callback.getSuccessStatus()) {
+      logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", 
flow_file->getUUIDStr());
+      session->remove(flow_file);
     } else {
-      session->putAttribute(processFlowFile, MQTT_BROKER_ATTRIBUTE, uri_);
-      session->putAttribute(processFlowFile, MQTT_TOPIC_ATTRIBUTE, topic_);
-      logger_->log_debug("ConsumeMQTT processing success for the flow with 
UUID %s topic %s", processFlowFile->getUUIDStr(), topic_);
-      session->transfer(processFlowFile, Success);
+      putUserPropertiesAsAttributes(message, flow_file, session);
+      session->putAttribute(flow_file, MQTT_BROKER_ATTRIBUTE, uri_);
+      session->putAttribute(flow_file, MQTT_TOPIC_ATTRIBUTE, message.topic);
+      fillAttributeFromContentType(message, flow_file, session);
+      logger_->log_debug("ConsumeMQTT processing success for the flow with 
UUID %s topic %s", flow_file->getUUIDStr(), message.topic);
+      session->transfer(flow_file, Success);
     }
-    msg_queue.pop_front();
+    msg_queue.pop();
+  }
+}
+
+std::queue<ConsumeMQTT::SmartMessage> ConsumeMQTT::getReceivedMqttMessages() {
+  std::queue<SmartMessage> msg_queue;
+  SmartMessage message;
+  while (queue_.try_dequeue(message)) {
+    msg_queue.push(std::move(message));
+  }
+  return msg_queue;
+}
+
+int64_t ConsumeMQTT::WriteCallback::operator() (const 
std::shared_ptr<io::OutputStream>& stream) {
+  if (message_.contents->payloadlen < 0) {
+    success_status_ = false;
+    logger_->log_error("Payload length of message is negative, value is [%d]", 
message_.contents->payloadlen);
+    return -1;
+  }
+
+  const auto len = 
stream->write(reinterpret_cast<uint8_t*>(message_.contents->payload), 
gsl::narrow<size_t>(message_.contents->payloadlen));
+  if (io::isError(len)) {
+    success_status_ = false;
+    logger_->log_error("Stream writing error when processing message");
+    return -1;
+  }
+
+  return len;
+}
+
+void ConsumeMQTT::putUserPropertiesAsAttributes(const SmartMessage& message, 
const std::shared_ptr<core::FlowFile>& flow_file, const 
std::shared_ptr<core::ProcessSession>& session) const {
+  if (mqtt_version_.value() != MqttVersions::V_5_0) {

Review Comment:
   I think we rely too much on the mqtt_version_ and a lot of the function 
implementations are if-else branches depending on this version. A cleaner 
solution would be to collect these differing functionalities, create a separate 
abstract class for these functions with two separate inherited class 
implementations for the two versions. After we have the version in `onSchedule` 
we can instantiate the proper one and call the proper implementation in each 
case where it differs (strategy pattern). If needed the common parts can also 
be moved to the base class and only the differences implemented in the 
inherited classes (template method pattern).



##########
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##########
@@ -163,23 +200,259 @@ void AbstractMQTTProcessor::reconnect() {
   }
 
   logger_->log_info("Reconnecting to %s", uri_);
-  int ret = MQTTAsync_connect(client_, &conn_opts);
+  if (MQTTAsync_isConnected(client_)) {
+    logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
+    return;
+  }
+  const int ret = MQTTAsync_connect(client_, &conn_opts);
+  MQTTProperties_free(&connect_props);
   if (ret != MQTTASYNC_SUCCESS) {
-    logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, 
ret);
+    logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error 
code [%d]", uri_, ret);
+    return;
   }
+
+  // wait until connection succeeds or fails
+  connect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& 
conn_opts, MQTTProperties& connect_props, MQTTProperties& will_props) const {
+  conn_opts.cleanstart = getCleanStart();
+
+  {
+    MQTTProperty property;
+    property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
+    property.value.integer4 = 
gsl::narrow<int>(getSessionExpiryInterval().count());
+    MQTTProperties_add(&connect_props, &property);
+  }
+
+  if (!last_will_content_type_.empty()) {
+    MQTTProperty property;
+    property.identifier = MQTTPROPERTY_CODE_CONTENT_TYPE;
+    property.value.data.len = last_will_content_type_.length();
+    property.value.data.data = 
const_cast<char*>(last_will_content_type_.data());
+    MQTTProperties_add(&will_props, &property);
+  }
+
+  conn_opts.willProperties = &will_props;
+
+  setMqtt5ConnectOptionsImpl(connect_props);
+}
+
+void AbstractMQTTProcessor::onTrigger(const 
std::shared_ptr<core::ProcessContext>& context, const 
std::shared_ptr<core::ProcessSession>& session) {
+  // read lock
+  std::shared_lock client_lock{client_mutex_};
+  if (client_ == nullptr) {
+    // we are shutting down
+    return;
+  }
+
+  // reconnect if needed

Review Comment:
   I'm not sure this comment adds value.



##########
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##########
@@ -163,23 +200,259 @@ void AbstractMQTTProcessor::reconnect() {
   }
 
   logger_->log_info("Reconnecting to %s", uri_);
-  int ret = MQTTAsync_connect(client_, &conn_opts);
+  if (MQTTAsync_isConnected(client_)) {
+    logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
+    return;
+  }
+  const int ret = MQTTAsync_connect(client_, &conn_opts);
+  MQTTProperties_free(&connect_props);
   if (ret != MQTTASYNC_SUCCESS) {
-    logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, 
ret);
+    logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error 
code [%d]", uri_, ret);
+    return;
   }
+
+  // wait until connection succeeds or fails
+  connect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& 
conn_opts, MQTTProperties& connect_props, MQTTProperties& will_props) const {
+  conn_opts.cleanstart = getCleanStart();
+
+  {
+    MQTTProperty property;
+    property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
+    property.value.integer4 = 
gsl::narrow<int>(getSessionExpiryInterval().count());
+    MQTTProperties_add(&connect_props, &property);
+  }
+
+  if (!last_will_content_type_.empty()) {
+    MQTTProperty property;
+    property.identifier = MQTTPROPERTY_CODE_CONTENT_TYPE;
+    property.value.data.len = last_will_content_type_.length();
+    property.value.data.data = 
const_cast<char*>(last_will_content_type_.data());
+    MQTTProperties_add(&will_props, &property);
+  }
+
+  conn_opts.willProperties = &will_props;
+
+  setMqtt5ConnectOptionsImpl(connect_props);
+}
+
+void AbstractMQTTProcessor::onTrigger(const 
std::shared_ptr<core::ProcessContext>& context, const 
std::shared_ptr<core::ProcessSession>& session) {
+  // read lock
+  std::shared_lock client_lock{client_mutex_};
+  if (client_ == nullptr) {
+    // we are shutting down

Review Comment:
   Can we add a trace/debug log instead of this comment?



##########
extensions/mqtt/processors/PublishMQTT.h:
##########
@@ -62,72 +68,116 @@ class PublishMQTT : public 
processors::AbstractMQTTProcessor {
 
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 
-  class ReadCallback {
+  void readProperties(const std::shared_ptr<core::ProcessContext>& context) 
override;
+  void onTriggerImpl(const std::shared_ptr<core::ProcessContext>& context, 
const std::shared_ptr<core::ProcessSession>& session) override;
+  void initialize() override;
+
+ private:
+  /**
+   * Counts unacknowledged QoS 1 and QoS 2 messages to respect broker's 
Receive Maximum
+   */
+  class InFlightMessageCounter {
    public:
-    ReadCallback(PublishMQTT* processor, uint64_t flow_size, uint64_t 
max_seg_size, std::string topic, MQTTAsync client, int qos, bool retain)
-        : processor_(processor),
-          flow_size_(flow_size),
-          max_seg_size_(max_seg_size),
-          topic_(std::move(topic)),
-          client_(client),
-          qos_(qos),
-          retain_(retain) {
+    void setMqttVersion(const MqttVersions mqtt_version) {
+      mqtt_version_ = mqtt_version;
+    }
+
+    void setQoS(const MqttQoS qos) {
+      qos_ = qos;
+    }
+
+    void setMax(const uint16_t new_limit) {
+      limit_ = new_limit;
     }
 
-    int64_t operator()(const std::shared_ptr<io::InputStream>& stream);
+    // increase on sending, wait if limit is reached
+    void increase();
 
-    size_t read_size_ = 0;
-    int status_ = 0;
+    // decrease on success or failure, notify
+    void decrease();
 
    private:
-    PublishMQTT* processor_;
-    uint64_t flow_size_;
-    uint64_t max_seg_size_;
-    std::string topic_;
-    MQTTAsync client_;
-
-    int qos_;
-    bool retain_;
+    std::mutex mutex_;
+    std::condition_variable cv_;
+    uint16_t counter_{0};
+    uint16_t limit_{MQTT_MAX_RECEIVE_MAXIMUM};
+    MqttVersions mqtt_version_;
+    MqttQoS qos_;
   };
 
-  void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSessionFactory> &factory) override;
-  void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSession> &session) override;
-  void initialize() override;
+  // MQTT static async callbacks, calling their notify with context being 
pointer to a packaged_task to notify()
+  static void sendSuccess(void* context, MQTTAsync_successData* response);
+  static void sendSuccess5(void* context, MQTTAsync_successData5* response);
+  static void sendFailure(void* context, MQTTAsync_failureData* response);
+  static void sendFailure5(void* context, MQTTAsync_failureData5* response);
+
+  /**
+   * Resolves topic from expression language
+   */
+  std::string getTopic(const std::shared_ptr<core::ProcessContext>& context, 
const std::shared_ptr<core::FlowFile>& flow_file) const;
+
+  /**
+   * Resolves content type from expression language
+   */
+  std::string getContentType(const std::shared_ptr<core::ProcessContext>& 
context, const std::shared_ptr<core::FlowFile>& flow_file) const;
+
+  /**
+   * Sends an MQTT message asynchronously
+   * @param buffer contents of the message
+   * @param topic topic of the message
+   * @param content_type Content Type for MQTT 5
+   * @param flow_file Flow File being processed
+   * @return success of message sending
+   */
+  bool sendMessage(const std::vector<std::byte>& buffer, const std::string& 
topic, const std::string& content_type, const std::shared_ptr<core::FlowFile>& 
flow_file);
+
+  /**
+   * Callback for asynchronous message sending
+   * @param success if message sending was successful
+   * @param response_code response code for failure only
+   * @param reason_code MQTT 5 reason code
+   * @return if message sending was successful
+   */
+  bool notify(bool success, std::optional<int> response_code, 
std::optional<MQTTReasonCodes> reason_code);
+
+  /**
+   * Set MQTT 5-exclusive properties
+   * @param message message object
+   * @param content_type content type
+   * @param flow_file Flow File being processed
+   */
+  void setMqtt5Properties(MQTTAsync_message& message, const std::string& 
content_type, const std::shared_ptr<core::FlowFile>& flow_file) const;
+
+  /**
+   * Adds flow file attributes as user properties to an MQTT 5 message
+   * @param message message object
+   * @param flow_file Flow File being processed
+   */
+  static void addAttributesAsUserProperties(MQTTAsync_message& message, const 
std::shared_ptr<core::FlowFile>& flow_file);
 
- private:
-  // MQTT async callback
-  static void sendSuccess(void* context, MQTTAsync_successData* response) {
-    auto* processor = reinterpret_cast<PublishMQTT*>(context);
-    processor->onSendSuccess(response);
-  }
-
-  // MQTT async callback
-  static void sendFailure(void* context, MQTTAsync_failureData* response) {
-    auto* processor = reinterpret_cast<PublishMQTT*>(context);
-    processor->onSendFailure(response);
+  bool getCleanSession() const override {
+    return true;
   }
 
-  void onSendSuccess(MQTTAsync_successData* /*response*/) {
-    logger_->log_debug("Successfully sent message to MQTT topic %s on broker 
%s", topic_, uri_);
+  bool getCleanStart() const override {
+    return true;
   }
 
-  void onSendFailure(MQTTAsync_failureData* response) {
-    logger_->log_error("Sending message failed on topic %s to MQTT broker %s 
(%d)", topic_, uri_, response->code);
-    if (response->message != nullptr) {
-      logger_->log_error("Detailed reason for sending failure: %s", 
response->message);
-    }
+  std::chrono::seconds getSessionExpiryInterval() const override {
+    // non-persistent session as we only publish
+    return std::chrono::seconds{0};
   }
 
-  bool getCleanSession() const override {
-    return true;
+  void startupClient() override {
+    // there is no need to do anything like subscribe in the beginning
   }
 
-  bool startupClient() override {
-    // there is no need to do anything like subscribe on the beginning
-    return true;
-  }
+  void checkProperties() override;
+  void checkBrokerLimitsImpl() override;
 
   bool retain_ = false;
+  std::optional<std::chrono::seconds> message_expiry_interval_;
+  InFlightMessageCounter in_flight_message_counter_;

Review Comment:
   This may be a good candidate for a processor specific metric.



##########
extensions/mqtt/processors/AbstractMQTTProcessor.h:
##########
@@ -46,150 +44,178 @@ class AbstractMQTTProcessor : public core::Processor {
     freeResources();
   }
 
+  SMART_ENUM(MqttVersions,
+    (V_3X_AUTO, "3.x AUTO"),
+    (V_3_1_0, "3.1.0"),
+    (V_3_1_1, "3.1.1"),
+    (V_5_0, "5.0"));
+
+  SMART_ENUM(MqttQoS,
+    (LEVEL_0, "0"),
+    (LEVEL_1, "1"),
+    (LEVEL_2, "2"));
+
   EXTENSIONAPI static const core::Property BrokerURI;
   EXTENSIONAPI static const core::Property ClientID;
+  EXTENSIONAPI static const core::Property QoS;
+  EXTENSIONAPI static const core::Property MqttVersion;
+  EXTENSIONAPI static const core::Property ConnectionTimeout;
+  EXTENSIONAPI static const core::Property KeepAliveInterval;
+  EXTENSIONAPI static const core::Property LastWillTopic;
+  EXTENSIONAPI static const core::Property LastWillMessage;
+  EXTENSIONAPI static const core::Property LastWillQoS;
+  EXTENSIONAPI static const core::Property LastWillRetain;
+  EXTENSIONAPI static const core::Property LastWillContentType;
   EXTENSIONAPI static const core::Property Username;
   EXTENSIONAPI static const core::Property Password;
-  EXTENSIONAPI static const core::Property KeepAliveInterval;
-  EXTENSIONAPI static const core::Property MaxFlowSegSize;
-  EXTENSIONAPI static const core::Property ConnectionTimeout;
-  EXTENSIONAPI static const core::Property Topic;
-  EXTENSIONAPI static const core::Property QoS;
   EXTENSIONAPI static const core::Property SecurityProtocol;
   EXTENSIONAPI static const core::Property SecurityCA;
   EXTENSIONAPI static const core::Property SecurityCert;
   EXTENSIONAPI static const core::Property SecurityPrivateKey;
   EXTENSIONAPI static const core::Property SecurityPrivateKeyPassword;
-  EXTENSIONAPI static const core::Property LastWillTopic;
-  EXTENSIONAPI static const core::Property LastWillMessage;
-  EXTENSIONAPI static const core::Property LastWillQoS;
-  EXTENSIONAPI static const core::Property LastWillRetain;
 
-  EXTENSIONAPI static auto properties() {
+
+  static auto basicProperties() {
+    return std::array{
+      BrokerURI,
+      ClientID,
+      MqttVersion
+    };
+  }
+
+  static auto advancedProperties() {
     return std::array{
-            BrokerURI,
-            Topic,
-            ClientID,
-            QoS,
-            ConnectionTimeout,
-            KeepAliveInterval,
-            MaxFlowSegSize,
-            LastWillTopic,
-            LastWillMessage,
-            LastWillQoS,
-            LastWillRetain,
-            Username,
-            Password,
-            SecurityProtocol,
-            SecurityCA,
-            SecurityCert,
-            SecurityPrivateKey,
-            SecurityPrivateKeyPassword
+      QoS,
+      ConnectionTimeout,
+      KeepAliveInterval,
+      LastWillTopic,
+      LastWillMessage,
+      LastWillQoS,
+      LastWillRetain,
+      LastWillContentType,
+      Username,
+      Password,
+      SecurityProtocol,
+      SecurityCA,
+      SecurityCert,
+      SecurityPrivateKey,
+      SecurityPrivateKeyPassword
     };
   }
 
   void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const 
std::shared_ptr<core::ProcessSessionFactory>& factory) override;
+  void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const 
std::shared_ptr<core::ProcessSession>& session) override;
 
   void notifyStop() override {
     freeResources();
   }
 
  protected:
+  struct MQTTMessageDeleter {
+    void operator()(MQTTAsync_message* message) {
+      MQTTAsync_freeMessage(&message);
+    }
+  };
+
+  struct SmartMessage {
+    std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> contents;
+    std::string topic;
+  };
+
+  // defined by Paho MQTT C library
+  static constexpr int PAHO_MQTT_C_FAILURE_CODE = -9999999;
+  static constexpr int MQTT_MAX_RECEIVE_MAXIMUM = 65535;
+
+  /**
+   * Connect to MQTT broker. Synchronously waits until connection succeeds or 
fails.
+   */
   void reconnect();
 
+  /**
+   * Checks property consistency before connecting to broker
+   */
+  virtual void checkProperties() {
+  }
+
+  /**
+   * Checks broker limits and supported features vs our desired features after 
connecting to broker
+   */
+  void checkBrokerLimits();
+  virtual void checkBrokerLimitsImpl() = 0;
+
+  // variables being used for a synchronous connection and disconnection
+  std::shared_mutex client_mutex_;
+
   MQTTAsync client_ = nullptr;
   std::string uri_;
-  std::string topic_;
   std::chrono::seconds keep_alive_interval_{60};
-  uint64_t max_seg_size_ = std::numeric_limits<uint64_t>::max();
-  std::chrono::seconds connection_timeout_{30};
-  uint32_t qos_ = MQTT_QOS_1;
+  std::chrono::seconds connection_timeout_{10};
+  MqttQoS qos_{MqttQoS::LEVEL_0};
   std::string clientID_;
   std::string username_;
   std::string password_;
+  MqttVersions mqtt_version_{MqttVersions::V_3X_AUTO};
 
- private:
-  // MQTT async callback
-  static int msgReceived(void *context, char* topic_name, int topic_len, 
MQTTAsync_message* message) {
-    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
-    processor->onMessageReceived(topic_name, topic_len, message);
-    return 1;
-  }
-
-  // MQTT async callback
-  static void connectionLost(void *context, char* cause) {
-    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
-    processor->onConnectionLost(cause);
-  }
-
-  // MQTT async callback
-  static void connectionSuccess(void* context, MQTTAsync_successData* 
response) {
-    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
-    processor->onConnectionSuccess(response);
-  }
+  // Supported operations
+  std::optional<bool> retain_available_;
+  std::optional<bool> wildcard_subscription_available_;
+  std::optional<bool> shared_subscription_available_;
 
-  // MQTT async callback
-  static void connectionFailure(void* context, MQTTAsync_failureData* 
response) {
-    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
-    processor->onConnectionFailure(response);
-  }
+  std::optional<uint16_t> broker_topic_alias_maximum_;
+  std::optional<uint16_t> broker_receive_maximum_;
+  std::optional<uint8_t> maximum_qos_;
+  std::optional<uint32_t> maximum_packet_size_;
 
-  // MQTT async callback
-  static void disconnectionSuccess(void* context, MQTTAsync_successData* 
response) {
-    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
-    processor->onDisconnectionSuccess(response);
-  }
-
-  // MQTT async callback
-  static void disconnectionFailure(void* context, MQTTAsync_failureData* 
response) {
-    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
-    processor->onDisconnectionFailure(response);
-  }
+  std::optional<std::chrono::seconds> maximum_session_expiry_interval_;
+  std::optional<std::chrono::seconds> server_keep_alive_;
 
-  virtual void onMessageReceived(char* topic_name, int /*topic_len*/, 
MQTTAsync_message* message) {
-    MQTTAsync_freeMessage(&message);
-    MQTTAsync_free(topic_name);
-  }
-
-  void onConnectionLost(char* cause) {
-    logger_->log_error("Connection lost to MQTT broker %s", uri_);
-    if (cause != nullptr) {
-      logger_->log_error("Cause for connection loss: %s", cause);
-    }
-  }
-
-  void onConnectionSuccess(MQTTAsync_successData* /*response*/) {
-    logger_->log_info("Successfully connected to MQTT broker %s", uri_);
-    startupClient();
-  }
+ private:
+  /**
+   * Initializes local MQTT client and connects to broker.
+   */
+  void initializeClient();
 
-  void onConnectionFailure(MQTTAsync_failureData* response) {
-    logger_->log_error("Connection failed to MQTT broker %s (%d)", uri_, 
response->code);
-    if (response->message != nullptr) {
-      logger_->log_error("Detailed reason for connection failure: %s", 
response->message);
-    }
-  }
+  /**
+   * Calls disconnect() and releases local MQTT client
+   */
+  void freeResources();
 
-  void onDisconnectionSuccess(MQTTAsync_successData* /*response*/) {
-    logger_->log_info("Successfully disconnected from MQTT broker %s", uri_);
-  }
+  /**
+   * Disconnect from MQTT broker. Synchronously waits until disconnection 
succeeds or fails.
+   */
+  void disconnect();
+
+  virtual void readProperties(const std::shared_ptr<core::ProcessContext>& 
context) = 0;
+  virtual void onTriggerImpl(const std::shared_ptr<core::ProcessContext>& 
context, const std::shared_ptr<core::ProcessSession>& session) = 0;
+  virtual void startupClient() = 0;
+  void setBrokerLimits(MQTTAsync_successData5* response);
+
+  // MQTT static async callbacks, calling their non-static counterparts with 
context being pointer to "this"
+  static void connectionLost(void *context, char* cause);
+  static void connectionSuccess(void* context, MQTTAsync_successData* 
response);
+  static void connectionSuccess5(void* context, MQTTAsync_successData5* 
response);
+  static void connectionFailure(void* context, MQTTAsync_failureData* 
response);
+  static void connectionFailure5(void* context, MQTTAsync_failureData5* 
response);
+  static int msgReceived(void *context, char* topic_name, int topic_len, 
MQTTAsync_message* message);
+
+  // MQTT async callback methods
+  void onConnectionLost(char* cause);
+  void onConnectFinished(MQTTAsync_successData* success_data, 
MQTTAsync_successData5* success_data_5, MQTTAsync_failureData* failure_data, 
MQTTAsync_failureData5* failure_data_5);
+  void onDisconnectFinished(MQTTAsync_successData* success_data, 
MQTTAsync_successData5* success_data_5, MQTTAsync_failureData* failure_data, 
MQTTAsync_failureData5* failure_data_5);
 
-  void onDisconnectionFailure(MQTTAsync_failureData* response) {
-    logger_->log_error("Disconnection failed from MQTT broker %s (%d)", uri_, 
response->code);
-    if (response->message != nullptr) {
-      logger_->log_error("Detailed reason for disconnection failure: %s", 
response->message);
-    }
+  /**
+   * Called if message is received. This is default implementation, to be 
overridden if subclass wants to use the message.
+   * @param topic topic of message
+   * @param message MQTT message
+   */
+  virtual void onMessageReceived(SmartMessage /*smartmessage*/) {
   }
 
   virtual bool getCleanSession() const = 0;
-  virtual bool startupClient() = 0;
-
-  void freeResources();
-
-  /**
-   * Checks property consistency before connecting to broker
-   */
-  virtual void checkProperties() {
+  virtual bool getCleanStart() const = 0;
+  virtual std::chrono::seconds getSessionExpiryInterval() const = 0;
+  void setMqtt5ConnectOptions(MQTTAsync_connectOptions& conn_opts, 
MQTTProperties& connect_props, MQTTProperties& will_props) const;
+  virtual void setMqtt5ConnectOptionsImpl(MQTTProperties& /*connect_props*/) 
const {

Review Comment:
   Could the `setMqtt5ConnectOptionsImpl` be removed and the 
`setMqtt5ConnectOptions` made virtual? In that case we could just override that 
if needed and call the base class's implementation additionally before adding 
our own implementation. I could work with `onTrigger` and `onTriggerImpl` as 
well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to