fgerlits commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1064808631


##########
extensions/mqtt/processors/AbstractMQTTProcessor.h:
##########
@@ -46,150 +44,178 @@ class AbstractMQTTProcessor : public core::Processor {
     freeResources();
   }
 
+  SMART_ENUM(MqttVersions,
+    (V_3X_AUTO, "3.x AUTO"),
+    (V_3_1_0, "3.1.0"),
+    (V_3_1_1, "3.1.1"),
+    (V_5_0, "5.0"));
+
+  SMART_ENUM(MqttQoS,
+    (LEVEL_0, "0"),
+    (LEVEL_1, "1"),
+    (LEVEL_2, "2"));
+
   EXTENSIONAPI static const core::Property BrokerURI;
   EXTENSIONAPI static const core::Property ClientID;
+  EXTENSIONAPI static const core::Property QoS;
+  EXTENSIONAPI static const core::Property MqttVersion;
+  EXTENSIONAPI static const core::Property ConnectionTimeout;
+  EXTENSIONAPI static const core::Property KeepAliveInterval;
+  EXTENSIONAPI static const core::Property LastWillTopic;
+  EXTENSIONAPI static const core::Property LastWillMessage;
+  EXTENSIONAPI static const core::Property LastWillQoS;
+  EXTENSIONAPI static const core::Property LastWillRetain;
+  EXTENSIONAPI static const core::Property LastWillContentType;
   EXTENSIONAPI static const core::Property Username;
   EXTENSIONAPI static const core::Property Password;
-  EXTENSIONAPI static const core::Property KeepAliveInterval;
-  EXTENSIONAPI static const core::Property MaxFlowSegSize;
-  EXTENSIONAPI static const core::Property ConnectionTimeout;
-  EXTENSIONAPI static const core::Property Topic;
-  EXTENSIONAPI static const core::Property QoS;
   EXTENSIONAPI static const core::Property SecurityProtocol;
   EXTENSIONAPI static const core::Property SecurityCA;
   EXTENSIONAPI static const core::Property SecurityCert;
   EXTENSIONAPI static const core::Property SecurityPrivateKey;
   EXTENSIONAPI static const core::Property SecurityPrivateKeyPassword;
-  EXTENSIONAPI static const core::Property LastWillTopic;
-  EXTENSIONAPI static const core::Property LastWillMessage;
-  EXTENSIONAPI static const core::Property LastWillQoS;
-  EXTENSIONAPI static const core::Property LastWillRetain;
 
-  EXTENSIONAPI static auto properties() {
+
+  static auto basicProperties() {
+    return std::array{
+      BrokerURI,
+      ClientID,
+      MqttVersion
+    };
+  }
+
+  static auto advancedProperties() {
     return std::array{
-            BrokerURI,
-            Topic,
-            ClientID,
-            QoS,
-            ConnectionTimeout,
-            KeepAliveInterval,
-            MaxFlowSegSize,
-            LastWillTopic,
-            LastWillMessage,
-            LastWillQoS,
-            LastWillRetain,
-            Username,
-            Password,
-            SecurityProtocol,
-            SecurityCA,
-            SecurityCert,
-            SecurityPrivateKey,
-            SecurityPrivateKeyPassword
+      QoS,
+      ConnectionTimeout,
+      KeepAliveInterval,
+      LastWillTopic,
+      LastWillMessage,
+      LastWillQoS,
+      LastWillRetain,
+      LastWillContentType,
+      Username,
+      Password,
+      SecurityProtocol,
+      SecurityCA,
+      SecurityCert,
+      SecurityPrivateKey,
+      SecurityPrivateKeyPassword
     };
   }
 
   void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const 
std::shared_ptr<core::ProcessSessionFactory>& factory) override;
+  void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const 
std::shared_ptr<core::ProcessSession>& session) override;
 
   void notifyStop() override {
     freeResources();
   }
 
  protected:
+  struct MQTTMessageDeleter {
+    void operator()(MQTTAsync_message* message) {
+      MQTTAsync_freeMessage(&message);
+    }
+  };
+
+  struct SmartMessage {
+    std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> contents;
+    std::string topic;
+  };
+
+  // defined by Paho MQTT C library
+  static constexpr int PAHO_MQTT_C_FAILURE_CODE = -9999999;
+  static constexpr int MQTT_MAX_RECEIVE_MAXIMUM = 65535;
+
+  /**
+   * Connect to MQTT broker. Synchronously waits until connection succeeds or 
fails.
+   */
   void reconnect();
 
+  /**
+   * Checks property consistency before connecting to broker
+   */
+  virtual void checkProperties() {
+  }
+
+  /**
+   * Checks broker limits and supported features vs our desired features after 
connecting to broker
+   */
+  void checkBrokerLimits();
+  virtual void checkBrokerLimitsImpl() = 0;
+
+  // variables being used for a synchronous connection and disconnection
+  std::shared_mutex client_mutex_;
+
   MQTTAsync client_ = nullptr;
   std::string uri_;
-  std::string topic_;
   std::chrono::seconds keep_alive_interval_{60};
-  uint64_t max_seg_size_ = std::numeric_limits<uint64_t>::max();
-  std::chrono::seconds connection_timeout_{30};
-  uint32_t qos_ = MQTT_QOS_1;
+  std::chrono::seconds connection_timeout_{10};
+  MqttQoS qos_{MqttQoS::LEVEL_0};
   std::string clientID_;
   std::string username_;
   std::string password_;
+  MqttVersions mqtt_version_{MqttVersions::V_3X_AUTO};
 
- private:
-  // MQTT async callback
-  static int msgReceived(void *context, char* topic_name, int topic_len, 
MQTTAsync_message* message) {
-    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
-    processor->onMessageReceived(topic_name, topic_len, message);
-    return 1;
-  }
-
-  // MQTT async callback
-  static void connectionLost(void *context, char* cause) {
-    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
-    processor->onConnectionLost(cause);
-  }
-
-  // MQTT async callback
-  static void connectionSuccess(void* context, MQTTAsync_successData* 
response) {
-    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
-    processor->onConnectionSuccess(response);
-  }
+  // Supported operations
+  std::optional<bool> retain_available_;
+  std::optional<bool> wildcard_subscription_available_;
+  std::optional<bool> shared_subscription_available_;
 
-  // MQTT async callback
-  static void connectionFailure(void* context, MQTTAsync_failureData* 
response) {
-    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
-    processor->onConnectionFailure(response);
-  }
+  std::optional<uint16_t> broker_topic_alias_maximum_;
+  std::optional<uint16_t> broker_receive_maximum_;
+  std::optional<uint8_t> maximum_qos_;
+  std::optional<uint32_t> maximum_packet_size_;
 
-  // MQTT async callback
-  static void disconnectionSuccess(void* context, MQTTAsync_successData* 
response) {
-    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
-    processor->onDisconnectionSuccess(response);
-  }
-
-  // MQTT async callback
-  static void disconnectionFailure(void* context, MQTTAsync_failureData* 
response) {
-    auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
-    processor->onDisconnectionFailure(response);
-  }
+  std::optional<std::chrono::seconds> maximum_session_expiry_interval_;
+  std::optional<std::chrono::seconds> server_keep_alive_;
 
-  virtual void onMessageReceived(char* topic_name, int /*topic_len*/, 
MQTTAsync_message* message) {
-    MQTTAsync_freeMessage(&message);
-    MQTTAsync_free(topic_name);
-  }
-
-  void onConnectionLost(char* cause) {
-    logger_->log_error("Connection lost to MQTT broker %s", uri_);
-    if (cause != nullptr) {
-      logger_->log_error("Cause for connection loss: %s", cause);
-    }
-  }
-
-  void onConnectionSuccess(MQTTAsync_successData* /*response*/) {
-    logger_->log_info("Successfully connected to MQTT broker %s", uri_);
-    startupClient();
-  }
+ private:
+  /**
+   * Initializes local MQTT client and connects to broker.
+   */
+  void initializeClient();
 
-  void onConnectionFailure(MQTTAsync_failureData* response) {
-    logger_->log_error("Connection failed to MQTT broker %s (%d)", uri_, 
response->code);
-    if (response->message != nullptr) {
-      logger_->log_error("Detailed reason for connection failure: %s", 
response->message);
-    }
-  }
+  /**
+   * Calls disconnect() and releases local MQTT client
+   */
+  void freeResources();
 
-  void onDisconnectionSuccess(MQTTAsync_successData* /*response*/) {
-    logger_->log_info("Successfully disconnected from MQTT broker %s", uri_);
-  }
+  /**
+   * Disconnect from MQTT broker. Synchronously waits until disconnection 
succeeds or fails.
+   */
+  void disconnect();
+
+  virtual void readProperties(const std::shared_ptr<core::ProcessContext>& 
context) = 0;
+  virtual void onTriggerImpl(const std::shared_ptr<core::ProcessContext>& 
context, const std::shared_ptr<core::ProcessSession>& session) = 0;
+  virtual void startupClient() = 0;
+  void setBrokerLimits(MQTTAsync_successData5* response);
+
+  // MQTT static async callbacks, calling their non-static counterparts with 
context being pointer to "this"
+  static void connectionLost(void *context, char* cause);
+  static void connectionSuccess(void* context, MQTTAsync_successData* 
response);
+  static void connectionSuccess5(void* context, MQTTAsync_successData5* 
response);
+  static void connectionFailure(void* context, MQTTAsync_failureData* 
response);
+  static void connectionFailure5(void* context, MQTTAsync_failureData5* 
response);
+  static int msgReceived(void *context, char* topic_name, int topic_len, 
MQTTAsync_message* message);
+
+  // MQTT async callback methods
+  void onConnectionLost(char* cause);
+  void onConnectFinished(MQTTAsync_successData* success_data, 
MQTTAsync_successData5* success_data_5, MQTTAsync_failureData* failure_data, 
MQTTAsync_failureData5* failure_data_5);
+  void onDisconnectFinished(MQTTAsync_successData* success_data, 
MQTTAsync_successData5* success_data_5, MQTTAsync_failureData* failure_data, 
MQTTAsync_failureData5* failure_data_5);
 
-  void onDisconnectionFailure(MQTTAsync_failureData* response) {
-    logger_->log_error("Disconnection failed from MQTT broker %s (%d)", uri_, 
response->code);
-    if (response->message != nullptr) {
-      logger_->log_error("Detailed reason for disconnection failure: %s", 
response->message);
-    }
+  /**
+   * Called if message is received. This is default implementation, to be 
overridden if subclass wants to use the message.
+   * @param topic topic of message
+   * @param message MQTT message
+   */
+  virtual void onMessageReceived(SmartMessage /*smartmessage*/) {
   }
 
   virtual bool getCleanSession() const = 0;
-  virtual bool startupClient() = 0;
-
-  void freeResources();
-
-  /**
-   * Checks property consistency before connecting to broker
-   */
-  virtual void checkProperties() {
+  virtual bool getCleanStart() const = 0;
+  virtual std::chrono::seconds getSessionExpiryInterval() const = 0;
+  void setMqtt5ConnectOptions(MQTTAsync_connectOptions& conn_opts, 
MQTTProperties& connect_props, MQTTProperties& will_props) const;
+  virtual void setMqtt5ConnectOptionsImpl(MQTTProperties& /*connect_props*/) 
const {

Review Comment:
   I have renamed the function to `setProcessorSpecificMqtt5ConnectOptions` in 
7e4b4c3ce09969316a03a6a1182e3e540dc645ce



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to