adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041268666


##########
extensions/mqtt/processors/ConsumeMQTT.cpp:
##########
@@ -34,111 +34,333 @@ void ConsumeMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr<MQTTAsync_message, 
MQTTMessageDeleter> message) {
-  if (queue_.size_approx() >= maxQueueSize_) {
-    logger_->log_warn("MQTT queue full");
+void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) {
+  if (queue_.size_approx() >= max_queue_size_) {
+    logger_->log_error("MQTT queue full");
     return;
   }
 
-  if (gsl::narrow<uint64_t>(message->payloadlen) > max_seg_size_) {
-    logger_->log_debug("MQTT message was truncated while enqueuing, original 
length: %d", message->payloadlen);
-    message->payloadlen = gsl::narrow<int>(max_seg_size_);
-  }
-
-  logger_->log_debug("enqueuing MQTT message with length %d", 
message->payloadlen);
+  logger_->log_debug("enqueuing MQTT message with length %d", 
message.contents->payloadlen);
   queue_.enqueue(std::move(message));
 }
 
-void ConsumeMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+void ConsumeMQTT::readProperties(const std::shared_ptr<core::ProcessContext>& 
context) {
+  if (auto value = context->getProperty(Topic)) {
+    topic_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_);
+
   if (const auto value = context->getProperty<bool>(CleanSession)) {
-    cleanSession_ = *value;
-    logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+    clean_session_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_);
+
+  if (const auto value = context->getProperty<bool>(CleanStart)) {
+    clean_start_ = *value;
+  }
+  logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_);
+
+  if (const auto session_expiry_interval = 
context->getProperty<core::TimePeriodValue>(SessionExpiryInterval)) {
+    session_expiry_interval_ = 
std::chrono::duration_cast<std::chrono::seconds>(session_expiry_interval->getMilliseconds());
+  }
+  logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s", 
int64_t{session_expiry_interval_.count()});
 
   if (const auto value = 
context->getProperty<uint64_t>(QueueBufferMaxMessage)) {
-    maxQueueSize_ = *value;
-    logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
maxQueueSize_);
+    max_queue_size_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
max_queue_size_);
 
-  // this connects to broker, so properties of this processor must be read 
before
-  AbstractMQTTProcessor::onSchedule(context, factory);
-}
+  if (auto value = context->getProperty(AttributeFromContentType)) {
+    attribute_from_content_type_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]", 
attribute_from_content_type_);
 
-void ConsumeMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>& 
/*context*/, const std::shared_ptr<core::ProcessSession> &session) {
-  // reconnect if needed
-  reconnect();
+  if (const auto topic_alias_maximum = 
context->getProperty<uint32_t>(TopicAliasMaximum)) {
+    topic_alias_maximum_ = gsl::narrow<uint16_t>(*topic_alias_maximum);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]", 
topic_alias_maximum_);
 
-  if (!MQTTAsync_isConnected(client_)) {
-    logger_->log_error("Could not consume from MQTT broker because 
disconnected to %s", uri_);
-    yield();
-    return;
+  if (const auto receive_maximum = 
context->getProperty<uint32_t>(ReceiveMaximum)) {
+    receive_maximum_ = gsl::narrow<uint16_t>(*receive_maximum);
   }
+  logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]", 
receive_maximum_);
+}
 
-  std::deque<std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter>> msg_queue;
-  getReceivedMQTTMsg(msg_queue);
+void ConsumeMQTT::onTriggerImpl(const std::shared_ptr<core::ProcessContext>& 
/*context*/, const std::shared_ptr<core::ProcessSession>& session) {
+  std::queue<SmartMessage> msg_queue = getReceivedMqttMessages();
   while (!msg_queue.empty()) {
     const auto& message = msg_queue.front();
-    std::shared_ptr<core::FlowFile> processFlowFile = session->create();
-    int write_status{};
-    session->write(processFlowFile, [&message, &write_status](const 
std::shared_ptr<io::OutputStream>& stream) -> int64_t {
-      if (message->payloadlen < 0) {
-        write_status = -1;
-        return -1;
-      }
-      const auto len = 
stream->write(reinterpret_cast<uint8_t*>(message->payload), 
gsl::narrow<size_t>(message->payloadlen));
-      if (io::isError(len)) {
-        write_status = -1;
-        return -1;
-      }
-      return gsl::narrow<int64_t>(len);
-    });
-    if (write_status < 0) {
-      logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", 
processFlowFile->getUUIDStr());
-      session->remove(processFlowFile);
+    std::shared_ptr<core::FlowFile> flow_file = session->create();
+    WriteCallback write_callback(message, logger_);
+    try {
+      session->write(flow_file, write_callback);
+    } catch (const Exception& ex) {
+      logger_->log_error("Error when processing message queue: %s", ex.what());
+    }
+    if (!write_callback.getSuccessStatus()) {
+      logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", 
flow_file->getUUIDStr());
+      session->remove(flow_file);
     } else {
-      session->putAttribute(processFlowFile, MQTT_BROKER_ATTRIBUTE, uri_);
-      session->putAttribute(processFlowFile, MQTT_TOPIC_ATTRIBUTE, topic_);
-      logger_->log_debug("ConsumeMQTT processing success for the flow with 
UUID %s topic %s", processFlowFile->getUUIDStr(), topic_);
-      session->transfer(processFlowFile, Success);
+      putUserPropertiesAsAttributes(message, flow_file, session);
+      session->putAttribute(flow_file, MQTT_BROKER_ATTRIBUTE, uri_);
+      session->putAttribute(flow_file, MQTT_TOPIC_ATTRIBUTE, message.topic);
+      fillAttributeFromContentType(message, flow_file, session);
+      logger_->log_debug("ConsumeMQTT processing success for the flow with 
UUID %s topic %s", flow_file->getUUIDStr(), message.topic);
+      session->transfer(flow_file, Success);
     }
-    msg_queue.pop_front();
+    msg_queue.pop();
+  }
+}
+
+std::queue<ConsumeMQTT::SmartMessage> ConsumeMQTT::getReceivedMqttMessages() {
+  std::queue<SmartMessage> msg_queue;
+  SmartMessage message;
+  while (queue_.try_dequeue(message)) {
+    msg_queue.push(std::move(message));
+  }
+  return msg_queue;
+}
+
+int64_t ConsumeMQTT::WriteCallback::operator() (const 
std::shared_ptr<io::OutputStream>& stream) {
+  if (message_.contents->payloadlen < 0) {
+    success_status_ = false;
+    logger_->log_error("Payload length of message is negative, value is [%d]", 
message_.contents->payloadlen);
+    return -1;
+  }
+
+  const auto len = 
stream->write(reinterpret_cast<uint8_t*>(message_.contents->payload), 
gsl::narrow<size_t>(message_.contents->payloadlen));
+  if (io::isError(len)) {
+    success_status_ = false;
+    logger_->log_error("Stream writing error when processing message");
+    return -1;
+  }
+
+  return len;
+}
+
+void ConsumeMQTT::putUserPropertiesAsAttributes(const SmartMessage& message, 
const std::shared_ptr<core::FlowFile>& flow_file, const 
std::shared_ptr<core::ProcessSession>& session) const {
+  if (mqtt_version_.value() != MqttVersions::V_5_0) {
+    return;
   }
+
+  const auto property_count = 
MQTTProperties_propertyCount(&message.contents->properties, 
MQTTPROPERTY_CODE_USER_PROPERTY);
+  for (int i=0; i < property_count; ++i) {
+    MQTTProperty* property = 
MQTTProperties_getPropertyAt(&message.contents->properties, 
MQTTPROPERTY_CODE_USER_PROPERTY, i);

Review Comment:
   This `i` index can only refer to user properties. Example: if there are 10 
properties, with 3 user properties being at index 3, 5 and 7, then 
`MQTTProperties_propertyCount` will return 3. `MQTTProperties_getPropertyAt` 
called with `MQTTPROPERTY_CODE_USER_PROPERTY` and indices 0, 1 and 2 will 
respectively return properties at index 3, 5 and 7. Souce: 
https://github.com/eclipse/paho.mqtt.c/blob/master/src/MQTTProperties.c#L524



##########
extensions/mqtt/processors/ConsumeMQTT.cpp:
##########
@@ -34,111 +34,333 @@ void ConsumeMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr<MQTTAsync_message, 
MQTTMessageDeleter> message) {
-  if (queue_.size_approx() >= maxQueueSize_) {
-    logger_->log_warn("MQTT queue full");
+void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) {
+  if (queue_.size_approx() >= max_queue_size_) {
+    logger_->log_error("MQTT queue full");
     return;
   }
 
-  if (gsl::narrow<uint64_t>(message->payloadlen) > max_seg_size_) {
-    logger_->log_debug("MQTT message was truncated while enqueuing, original 
length: %d", message->payloadlen);
-    message->payloadlen = gsl::narrow<int>(max_seg_size_);
-  }
-
-  logger_->log_debug("enqueuing MQTT message with length %d", 
message->payloadlen);
+  logger_->log_debug("enqueuing MQTT message with length %d", 
message.contents->payloadlen);
   queue_.enqueue(std::move(message));
 }
 
-void ConsumeMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+void ConsumeMQTT::readProperties(const std::shared_ptr<core::ProcessContext>& 
context) {
+  if (auto value = context->getProperty(Topic)) {
+    topic_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_);
+
   if (const auto value = context->getProperty<bool>(CleanSession)) {
-    cleanSession_ = *value;
-    logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+    clean_session_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_);
+
+  if (const auto value = context->getProperty<bool>(CleanStart)) {
+    clean_start_ = *value;
+  }
+  logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_);
+
+  if (const auto session_expiry_interval = 
context->getProperty<core::TimePeriodValue>(SessionExpiryInterval)) {
+    session_expiry_interval_ = 
std::chrono::duration_cast<std::chrono::seconds>(session_expiry_interval->getMilliseconds());
+  }
+  logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s", 
int64_t{session_expiry_interval_.count()});
 
   if (const auto value = 
context->getProperty<uint64_t>(QueueBufferMaxMessage)) {
-    maxQueueSize_ = *value;
-    logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
maxQueueSize_);
+    max_queue_size_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
max_queue_size_);
 
-  // this connects to broker, so properties of this processor must be read 
before
-  AbstractMQTTProcessor::onSchedule(context, factory);
-}
+  if (auto value = context->getProperty(AttributeFromContentType)) {
+    attribute_from_content_type_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]", 
attribute_from_content_type_);
 
-void ConsumeMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>& 
/*context*/, const std::shared_ptr<core::ProcessSession> &session) {
-  // reconnect if needed
-  reconnect();
+  if (const auto topic_alias_maximum = 
context->getProperty<uint32_t>(TopicAliasMaximum)) {
+    topic_alias_maximum_ = gsl::narrow<uint16_t>(*topic_alias_maximum);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]", 
topic_alias_maximum_);
 
-  if (!MQTTAsync_isConnected(client_)) {
-    logger_->log_error("Could not consume from MQTT broker because 
disconnected to %s", uri_);
-    yield();
-    return;
+  if (const auto receive_maximum = 
context->getProperty<uint32_t>(ReceiveMaximum)) {
+    receive_maximum_ = gsl::narrow<uint16_t>(*receive_maximum);
   }
+  logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]", 
receive_maximum_);
+}
 
-  std::deque<std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter>> msg_queue;
-  getReceivedMQTTMsg(msg_queue);
+void ConsumeMQTT::onTriggerImpl(const std::shared_ptr<core::ProcessContext>& 
/*context*/, const std::shared_ptr<core::ProcessSession>& session) {
+  std::queue<SmartMessage> msg_queue = getReceivedMqttMessages();
   while (!msg_queue.empty()) {
     const auto& message = msg_queue.front();
-    std::shared_ptr<core::FlowFile> processFlowFile = session->create();
-    int write_status{};
-    session->write(processFlowFile, [&message, &write_status](const 
std::shared_ptr<io::OutputStream>& stream) -> int64_t {
-      if (message->payloadlen < 0) {
-        write_status = -1;
-        return -1;
-      }
-      const auto len = 
stream->write(reinterpret_cast<uint8_t*>(message->payload), 
gsl::narrow<size_t>(message->payloadlen));
-      if (io::isError(len)) {
-        write_status = -1;
-        return -1;
-      }
-      return gsl::narrow<int64_t>(len);
-    });
-    if (write_status < 0) {
-      logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", 
processFlowFile->getUUIDStr());
-      session->remove(processFlowFile);
+    std::shared_ptr<core::FlowFile> flow_file = session->create();
+    WriteCallback write_callback(message, logger_);
+    try {
+      session->write(flow_file, write_callback);
+    } catch (const Exception& ex) {
+      logger_->log_error("Error when processing message queue: %s", ex.what());
+    }
+    if (!write_callback.getSuccessStatus()) {
+      logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", 
flow_file->getUUIDStr());
+      session->remove(flow_file);
     } else {
-      session->putAttribute(processFlowFile, MQTT_BROKER_ATTRIBUTE, uri_);
-      session->putAttribute(processFlowFile, MQTT_TOPIC_ATTRIBUTE, topic_);
-      logger_->log_debug("ConsumeMQTT processing success for the flow with 
UUID %s topic %s", processFlowFile->getUUIDStr(), topic_);
-      session->transfer(processFlowFile, Success);
+      putUserPropertiesAsAttributes(message, flow_file, session);
+      session->putAttribute(flow_file, MQTT_BROKER_ATTRIBUTE, uri_);
+      session->putAttribute(flow_file, MQTT_TOPIC_ATTRIBUTE, message.topic);
+      fillAttributeFromContentType(message, flow_file, session);
+      logger_->log_debug("ConsumeMQTT processing success for the flow with 
UUID %s topic %s", flow_file->getUUIDStr(), message.topic);
+      session->transfer(flow_file, Success);
     }
-    msg_queue.pop_front();
+    msg_queue.pop();
+  }
+}
+
+std::queue<ConsumeMQTT::SmartMessage> ConsumeMQTT::getReceivedMqttMessages() {
+  std::queue<SmartMessage> msg_queue;
+  SmartMessage message;
+  while (queue_.try_dequeue(message)) {
+    msg_queue.push(std::move(message));
+  }
+  return msg_queue;
+}
+
+int64_t ConsumeMQTT::WriteCallback::operator() (const 
std::shared_ptr<io::OutputStream>& stream) {
+  if (message_.contents->payloadlen < 0) {
+    success_status_ = false;
+    logger_->log_error("Payload length of message is negative, value is [%d]", 
message_.contents->payloadlen);
+    return -1;
+  }
+
+  const auto len = 
stream->write(reinterpret_cast<uint8_t*>(message_.contents->payload), 
gsl::narrow<size_t>(message_.contents->payloadlen));
+  if (io::isError(len)) {
+    success_status_ = false;
+    logger_->log_error("Stream writing error when processing message");
+    return -1;
+  }
+
+  return len;
+}
+
+void ConsumeMQTT::putUserPropertiesAsAttributes(const SmartMessage& message, 
const std::shared_ptr<core::FlowFile>& flow_file, const 
std::shared_ptr<core::ProcessSession>& session) const {
+  if (mqtt_version_.value() != MqttVersions::V_5_0) {
+    return;
   }
+
+  const auto property_count = 
MQTTProperties_propertyCount(&message.contents->properties, 
MQTTPROPERTY_CODE_USER_PROPERTY);
+  for (int i=0; i < property_count; ++i) {
+    MQTTProperty* property = 
MQTTProperties_getPropertyAt(&message.contents->properties, 
MQTTPROPERTY_CODE_USER_PROPERTY, i);

Review Comment:
   This `i` index can only refer to user properties. Example: if there are 10 
properties, with 3 user properties being at index 3, 5 and 7, then 
`MQTTProperties_propertyCount` will return 3. `MQTTProperties_getPropertyAt` 
called with `MQTTPROPERTY_CODE_USER_PROPERTY` and indices 0, 1 and 2 will 
respectively return properties at index 3, 5 and 7. Source: 
https://github.com/eclipse/paho.mqtt.c/blob/master/src/MQTTProperties.c#L524



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to