fgerlits commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1024188173
##########
PROCESSORS.md:
##########
@@ -337,28 +337,34 @@ This Processor gets the contents of a FlowFile from a
MQTT broker for a specifie
In the list below, the names of required properties appear in bold. Any other
properties (not in bold) are considered optional. The table also indicates any
default values, and whether a property supports the NiFi Expression Language.
-| Name | Default Value | Allowable Values | Description
|
-|-----------------------|---------------|------------------|-----------------------------------------------------------------------------------------------------------------------------|
-| **Broker URI** | | | The URI to use to
connect to the MQTT broker
|
-| **Topic** | | | The topic to
subscribe to
|
-| Client ID | | | MQTT client ID to
use
|
-| Quality of Service | 0 | | The Quality of
Service (QoS) to receive the message with. Accepts three values '0', '1' and
'2' |
-| Connection Timeout | 30 sec | | Maximum time
interval the client will wait for the network connection to the MQTT broker
|
-| Keep Alive Interval | 60 sec | | Defines the
maximum time interval between messages being sent to the broker
|
-| Max Flow Segment Size | | | Maximum flow
content payload segment size for the MQTT record
|
-| Last Will Topic | | | The topic to send
the client's Last Will to. If the Last Will topic is not set then a Last Will
will not be sent |
-| Last Will Message | | | The message to
send as the client's Last Will. If the Last Will Message is empty, Last Will
will be deleted from the broker |
-| Last Will QoS | 0 | | The Quality of
Service (QoS) to send the last will with. Accepts three values '0', '1' and '2'
|
-| Last Will Retain | false | | Whether to retain
the client's Last Will
|
-| Security Protocol | | | Protocol used to
communicate with brokers
|
-| Security CA | | | File or directory
path to CA certificate(s) for verifying the broker's key
|
-| Security Cert | | | Path to client's
public key (PEM) used for authentication
|
-| Security Private Key | | | Path to client's
private key (PEM) used for authentication
|
-| Security Pass Phrase | | | Private key
passphrase
|
-| Username | | | Username to use
when connecting to the broker
|
-| Password | | | Password to use
when connecting to the broker
|
-| Clean Session | true | | Whether to start
afresh rather than remembering previous subscriptions
|
-| Queue Max Message | 1000 | | Maximum number of
messages allowed on the received MQTT queue
|
+| Name | Default Value | Allowable Values |
Description
|
+|-----------------------------|---------------|-----------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| **Broker URI** | | |
The URI to use to connect to the MQTT broker
|
+| Client ID | | |
MQTT client ID to use. WARNING: Must not be empty when using MQTT 3.1.0!
|
+| MQTT Version | 3.x AUTO | 3.x AUTO, 3.1.0, 3.1.1, 5.0 |
The MQTT specification version when connecting to the broker.
|
+| **Topic** | | |
The topic to subscribe to.
|
+| Clean Session | true | |
Whether to start afresh rather than remembering previous subscriptions. Also
make broker remember subscriptions after disconnected. WARNING: MQTT 3.x only. |
+| Clean Start | true | |
Whether to start afresh rather than remembering previous subscriptions.
WARNING: MQTT 5.x only.
|
Review Comment:
I think all-caps WARNING is too much:
```suggestion
| Clean Start | true |
| Whether to start afresh rather than remembering previous subscriptions. (MQTT
5.x only) |
```
(also elsewhere)
##########
libminifi/include/utils/Enum.h:
##########
@@ -39,6 +39,7 @@ namespace utils {
constexpr Clazz(Type value = static_cast<Type>(-1)) : value_{value} {} \
explicit Clazz(const std::string& str) : value_{parse(str.c_str()).value_}
{} \
explicit Clazz(const char* str) : value_{parse(str).value_} {} \
+ explicit Clazz(std::nullptr_t) = delete; \
Review Comment:
what does this do?
##########
extensions/mqtt/processors/ConsumeMQTT.cpp:
##########
@@ -34,111 +34,333 @@ void ConsumeMQTT::initialize() {
setSupportedRelationships(relationships());
}
-void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr<MQTTAsync_message,
MQTTMessageDeleter> message) {
- if (queue_.size_approx() >= maxQueueSize_) {
- logger_->log_warn("MQTT queue full");
+void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) {
+ if (queue_.size_approx() >= max_queue_size_) {
+ logger_->log_error("MQTT queue full");
return;
}
- if (gsl::narrow<uint64_t>(message->payloadlen) > max_seg_size_) {
- logger_->log_debug("MQTT message was truncated while enqueuing, original
length: %d", message->payloadlen);
- message->payloadlen = gsl::narrow<int>(max_seg_size_);
- }
-
- logger_->log_debug("enqueuing MQTT message with length %d",
message->payloadlen);
+ logger_->log_debug("enqueuing MQTT message with length %d",
message.contents->payloadlen);
queue_.enqueue(std::move(message));
}
-void ConsumeMQTT::onSchedule(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+void ConsumeMQTT::readProperties(const std::shared_ptr<core::ProcessContext>&
context) {
+ if (auto value = context->getProperty(Topic)) {
+ topic_ = std::move(*value);
+ }
+ logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_);
+
if (const auto value = context->getProperty<bool>(CleanSession)) {
- cleanSession_ = *value;
- logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+ clean_session_ = *value;
}
+ logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_);
+
+ if (const auto value = context->getProperty<bool>(CleanStart)) {
+ clean_start_ = *value;
+ }
+ logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_);
+
+ if (const auto session_expiry_interval =
context->getProperty<core::TimePeriodValue>(SessionExpiryInterval)) {
+ session_expiry_interval_ =
std::chrono::duration_cast<std::chrono::seconds>(session_expiry_interval->getMilliseconds());
+ }
+ logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s",
int64_t{session_expiry_interval_.count()});
if (const auto value =
context->getProperty<uint64_t>(QueueBufferMaxMessage)) {
- maxQueueSize_ = *value;
- logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]",
maxQueueSize_);
+ max_queue_size_ = *value;
}
+ logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]",
max_queue_size_);
- // this connects to broker, so properties of this processor must be read
before
- AbstractMQTTProcessor::onSchedule(context, factory);
-}
+ if (auto value = context->getProperty(AttributeFromContentType)) {
+ attribute_from_content_type_ = std::move(*value);
+ }
+ logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]",
attribute_from_content_type_);
-void ConsumeMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>&
/*context*/, const std::shared_ptr<core::ProcessSession> &session) {
- // reconnect if needed
- reconnect();
+ if (const auto topic_alias_maximum =
context->getProperty<uint32_t>(TopicAliasMaximum)) {
+ topic_alias_maximum_ = gsl::narrow<uint16_t>(*topic_alias_maximum);
+ }
+ logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]",
topic_alias_maximum_);
- if (!MQTTAsync_isConnected(client_)) {
- logger_->log_error("Could not consume from MQTT broker because
disconnected to %s", uri_);
- yield();
- return;
+ if (const auto receive_maximum =
context->getProperty<uint32_t>(ReceiveMaximum)) {
+ receive_maximum_ = gsl::narrow<uint16_t>(*receive_maximum);
}
+ logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]",
receive_maximum_);
+}
- std::deque<std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter>> msg_queue;
- getReceivedMQTTMsg(msg_queue);
+void ConsumeMQTT::onTriggerImpl(const std::shared_ptr<core::ProcessContext>&
/*context*/, const std::shared_ptr<core::ProcessSession>& session) {
+ std::queue<SmartMessage> msg_queue = getReceivedMqttMessages();
while (!msg_queue.empty()) {
const auto& message = msg_queue.front();
- std::shared_ptr<core::FlowFile> processFlowFile = session->create();
- int write_status{};
- session->write(processFlowFile, [&message, &write_status](const
std::shared_ptr<io::OutputStream>& stream) -> int64_t {
- if (message->payloadlen < 0) {
- write_status = -1;
- return -1;
- }
- const auto len =
stream->write(reinterpret_cast<uint8_t*>(message->payload),
gsl::narrow<size_t>(message->payloadlen));
- if (io::isError(len)) {
- write_status = -1;
- return -1;
- }
- return gsl::narrow<int64_t>(len);
- });
- if (write_status < 0) {
- logger_->log_error("ConsumeMQTT fail for the flow with UUID %s",
processFlowFile->getUUIDStr());
- session->remove(processFlowFile);
+ std::shared_ptr<core::FlowFile> flow_file = session->create();
+ WriteCallback write_callback(message, logger_);
+ try {
+ session->write(flow_file, write_callback);
Review Comment:
I don't think this works, as `write()` will work on a temporary
`std::function` containing a copy of `write_callback`, so `success_status_`
won't be set.
```suggestion
session->write(flow_file, std::ref(write_callback));
```
##########
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##########
@@ -163,23 +200,259 @@ void AbstractMQTTProcessor::reconnect() {
}
logger_->log_info("Reconnecting to %s", uri_);
- int ret = MQTTAsync_connect(client_, &conn_opts);
+ if (MQTTAsync_isConnected(client_)) {
+ logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
+ return;
+ }
+ const int ret = MQTTAsync_connect(client_, &conn_opts);
+ MQTTProperties_free(&connect_props);
if (ret != MQTTASYNC_SUCCESS) {
- logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_,
ret);
+ logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error
code [%d]", uri_, ret);
+ return;
}
+
+ // wait until connection succeeds or fails
+ connect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions&
conn_opts, MQTTProperties& connect_props, MQTTProperties& will_props) const {
+ conn_opts.cleanstart = getCleanStart();
+
+ {
+ MQTTProperty property;
+ property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
+ property.value.integer4 =
gsl::narrow<int>(getSessionExpiryInterval().count());
+ MQTTProperties_add(&connect_props, &property);
+ }
Review Comment:
I don't think `MQTTProperties_add()` copies the property object: it just
copies the `&property` pointer into `connect_props->array`. So `connect_props`
will contain a pointer to this object on the stack which will have been rolled
back by the time we use `connect_props`.
(the same problem happens elsewhere, too, eg. in
`ConsumeMQTT::setMqtt5ConnectOptionsImpl()` and
`PublishMQTT::setMqtt5Properties()`)
##########
extensions/mqtt/processors/ConsumeMQTT.cpp:
##########
@@ -34,111 +34,333 @@ void ConsumeMQTT::initialize() {
setSupportedRelationships(relationships());
}
-void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr<MQTTAsync_message,
MQTTMessageDeleter> message) {
- if (queue_.size_approx() >= maxQueueSize_) {
- logger_->log_warn("MQTT queue full");
+void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) {
+ if (queue_.size_approx() >= max_queue_size_) {
+ logger_->log_error("MQTT queue full");
return;
}
- if (gsl::narrow<uint64_t>(message->payloadlen) > max_seg_size_) {
- logger_->log_debug("MQTT message was truncated while enqueuing, original
length: %d", message->payloadlen);
- message->payloadlen = gsl::narrow<int>(max_seg_size_);
- }
-
- logger_->log_debug("enqueuing MQTT message with length %d",
message->payloadlen);
+ logger_->log_debug("enqueuing MQTT message with length %d",
message.contents->payloadlen);
queue_.enqueue(std::move(message));
}
-void ConsumeMQTT::onSchedule(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+void ConsumeMQTT::readProperties(const std::shared_ptr<core::ProcessContext>&
context) {
+ if (auto value = context->getProperty(Topic)) {
+ topic_ = std::move(*value);
+ }
+ logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_);
+
if (const auto value = context->getProperty<bool>(CleanSession)) {
- cleanSession_ = *value;
- logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+ clean_session_ = *value;
}
+ logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_);
+
+ if (const auto value = context->getProperty<bool>(CleanStart)) {
+ clean_start_ = *value;
+ }
+ logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_);
+
+ if (const auto session_expiry_interval =
context->getProperty<core::TimePeriodValue>(SessionExpiryInterval)) {
+ session_expiry_interval_ =
std::chrono::duration_cast<std::chrono::seconds>(session_expiry_interval->getMilliseconds());
+ }
+ logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s",
int64_t{session_expiry_interval_.count()});
if (const auto value =
context->getProperty<uint64_t>(QueueBufferMaxMessage)) {
- maxQueueSize_ = *value;
- logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]",
maxQueueSize_);
+ max_queue_size_ = *value;
}
+ logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]",
max_queue_size_);
- // this connects to broker, so properties of this processor must be read
before
- AbstractMQTTProcessor::onSchedule(context, factory);
-}
+ if (auto value = context->getProperty(AttributeFromContentType)) {
+ attribute_from_content_type_ = std::move(*value);
+ }
+ logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]",
attribute_from_content_type_);
-void ConsumeMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>&
/*context*/, const std::shared_ptr<core::ProcessSession> &session) {
- // reconnect if needed
- reconnect();
+ if (const auto topic_alias_maximum =
context->getProperty<uint32_t>(TopicAliasMaximum)) {
+ topic_alias_maximum_ = gsl::narrow<uint16_t>(*topic_alias_maximum);
+ }
+ logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]",
topic_alias_maximum_);
- if (!MQTTAsync_isConnected(client_)) {
- logger_->log_error("Could not consume from MQTT broker because
disconnected to %s", uri_);
- yield();
- return;
+ if (const auto receive_maximum =
context->getProperty<uint32_t>(ReceiveMaximum)) {
+ receive_maximum_ = gsl::narrow<uint16_t>(*receive_maximum);
}
+ logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]",
receive_maximum_);
+}
- std::deque<std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter>> msg_queue;
- getReceivedMQTTMsg(msg_queue);
+void ConsumeMQTT::onTriggerImpl(const std::shared_ptr<core::ProcessContext>&
/*context*/, const std::shared_ptr<core::ProcessSession>& session) {
+ std::queue<SmartMessage> msg_queue = getReceivedMqttMessages();
while (!msg_queue.empty()) {
const auto& message = msg_queue.front();
- std::shared_ptr<core::FlowFile> processFlowFile = session->create();
- int write_status{};
- session->write(processFlowFile, [&message, &write_status](const
std::shared_ptr<io::OutputStream>& stream) -> int64_t {
- if (message->payloadlen < 0) {
- write_status = -1;
- return -1;
- }
- const auto len =
stream->write(reinterpret_cast<uint8_t*>(message->payload),
gsl::narrow<size_t>(message->payloadlen));
- if (io::isError(len)) {
- write_status = -1;
- return -1;
- }
- return gsl::narrow<int64_t>(len);
- });
- if (write_status < 0) {
- logger_->log_error("ConsumeMQTT fail for the flow with UUID %s",
processFlowFile->getUUIDStr());
- session->remove(processFlowFile);
+ std::shared_ptr<core::FlowFile> flow_file = session->create();
+ WriteCallback write_callback(message, logger_);
+ try {
+ session->write(flow_file, write_callback);
+ } catch (const Exception& ex) {
+ logger_->log_error("Error when processing message queue: %s", ex.what());
+ }
+ if (!write_callback.getSuccessStatus()) {
+ logger_->log_error("ConsumeMQTT fail for the flow with UUID %s",
flow_file->getUUIDStr());
+ session->remove(flow_file);
} else {
- session->putAttribute(processFlowFile, MQTT_BROKER_ATTRIBUTE, uri_);
- session->putAttribute(processFlowFile, MQTT_TOPIC_ATTRIBUTE, topic_);
- logger_->log_debug("ConsumeMQTT processing success for the flow with
UUID %s topic %s", processFlowFile->getUUIDStr(), topic_);
- session->transfer(processFlowFile, Success);
+ putUserPropertiesAsAttributes(message, flow_file, session);
+ session->putAttribute(flow_file, MQTT_BROKER_ATTRIBUTE, uri_);
+ session->putAttribute(flow_file, MQTT_TOPIC_ATTRIBUTE, message.topic);
+ fillAttributeFromContentType(message, flow_file, session);
+ logger_->log_debug("ConsumeMQTT processing success for the flow with
UUID %s topic %s", flow_file->getUUIDStr(), message.topic);
+ session->transfer(flow_file, Success);
}
- msg_queue.pop_front();
+ msg_queue.pop();
+ }
+}
+
+std::queue<ConsumeMQTT::SmartMessage> ConsumeMQTT::getReceivedMqttMessages() {
+ std::queue<SmartMessage> msg_queue;
+ SmartMessage message;
+ while (queue_.try_dequeue(message)) {
+ msg_queue.push(std::move(message));
+ }
+ return msg_queue;
+}
+
+int64_t ConsumeMQTT::WriteCallback::operator() (const
std::shared_ptr<io::OutputStream>& stream) {
+ if (message_.contents->payloadlen < 0) {
+ success_status_ = false;
+ logger_->log_error("Payload length of message is negative, value is [%d]",
message_.contents->payloadlen);
+ return -1;
+ }
+
+ const auto len =
stream->write(reinterpret_cast<uint8_t*>(message_.contents->payload),
gsl::narrow<size_t>(message_.contents->payloadlen));
+ if (io::isError(len)) {
+ success_status_ = false;
+ logger_->log_error("Stream writing error when processing message");
+ return -1;
+ }
+
+ return len;
+}
+
+void ConsumeMQTT::putUserPropertiesAsAttributes(const SmartMessage& message,
const std::shared_ptr<core::FlowFile>& flow_file, const
std::shared_ptr<core::ProcessSession>& session) const {
+ if (mqtt_version_.value() != MqttVersions::V_5_0) {
+ return;
}
+
+ const auto property_count =
MQTTProperties_propertyCount(&message.contents->properties,
MQTTPROPERTY_CODE_USER_PROPERTY);
+ for (int i=0; i < property_count; ++i) {
+ MQTTProperty* property =
MQTTProperties_getPropertyAt(&message.contents->properties,
MQTTPROPERTY_CODE_USER_PROPERTY, i);
Review Comment:
this can return null if the `i`th property is not a user property (ie., it's
an integer property) -- are we sure that can never happen?
##########
extensions/mqtt/processors/ConsumeMQTT.cpp:
##########
@@ -34,111 +34,333 @@ void ConsumeMQTT::initialize() {
setSupportedRelationships(relationships());
}
-void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr<MQTTAsync_message,
MQTTMessageDeleter> message) {
- if (queue_.size_approx() >= maxQueueSize_) {
- logger_->log_warn("MQTT queue full");
+void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) {
+ if (queue_.size_approx() >= max_queue_size_) {
+ logger_->log_error("MQTT queue full");
return;
}
- if (gsl::narrow<uint64_t>(message->payloadlen) > max_seg_size_) {
- logger_->log_debug("MQTT message was truncated while enqueuing, original
length: %d", message->payloadlen);
- message->payloadlen = gsl::narrow<int>(max_seg_size_);
- }
-
- logger_->log_debug("enqueuing MQTT message with length %d",
message->payloadlen);
+ logger_->log_debug("enqueuing MQTT message with length %d",
message.contents->payloadlen);
queue_.enqueue(std::move(message));
}
-void ConsumeMQTT::onSchedule(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+void ConsumeMQTT::readProperties(const std::shared_ptr<core::ProcessContext>&
context) {
+ if (auto value = context->getProperty(Topic)) {
+ topic_ = std::move(*value);
+ }
+ logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_);
+
if (const auto value = context->getProperty<bool>(CleanSession)) {
- cleanSession_ = *value;
- logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+ clean_session_ = *value;
}
+ logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_);
+
+ if (const auto value = context->getProperty<bool>(CleanStart)) {
+ clean_start_ = *value;
+ }
+ logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_);
+
+ if (const auto session_expiry_interval =
context->getProperty<core::TimePeriodValue>(SessionExpiryInterval)) {
+ session_expiry_interval_ =
std::chrono::duration_cast<std::chrono::seconds>(session_expiry_interval->getMilliseconds());
+ }
+ logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s",
int64_t{session_expiry_interval_.count()});
if (const auto value =
context->getProperty<uint64_t>(QueueBufferMaxMessage)) {
- maxQueueSize_ = *value;
- logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]",
maxQueueSize_);
+ max_queue_size_ = *value;
}
+ logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]",
max_queue_size_);
- // this connects to broker, so properties of this processor must be read
before
- AbstractMQTTProcessor::onSchedule(context, factory);
-}
+ if (auto value = context->getProperty(AttributeFromContentType)) {
+ attribute_from_content_type_ = std::move(*value);
+ }
+ logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]",
attribute_from_content_type_);
-void ConsumeMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>&
/*context*/, const std::shared_ptr<core::ProcessSession> &session) {
- // reconnect if needed
- reconnect();
+ if (const auto topic_alias_maximum =
context->getProperty<uint32_t>(TopicAliasMaximum)) {
+ topic_alias_maximum_ = gsl::narrow<uint16_t>(*topic_alias_maximum);
+ }
+ logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]",
topic_alias_maximum_);
- if (!MQTTAsync_isConnected(client_)) {
- logger_->log_error("Could not consume from MQTT broker because
disconnected to %s", uri_);
- yield();
- return;
+ if (const auto receive_maximum =
context->getProperty<uint32_t>(ReceiveMaximum)) {
+ receive_maximum_ = gsl::narrow<uint16_t>(*receive_maximum);
}
+ logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]",
receive_maximum_);
+}
- std::deque<std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter>> msg_queue;
- getReceivedMQTTMsg(msg_queue);
+void ConsumeMQTT::onTriggerImpl(const std::shared_ptr<core::ProcessContext>&
/*context*/, const std::shared_ptr<core::ProcessSession>& session) {
+ std::queue<SmartMessage> msg_queue = getReceivedMqttMessages();
while (!msg_queue.empty()) {
const auto& message = msg_queue.front();
- std::shared_ptr<core::FlowFile> processFlowFile = session->create();
- int write_status{};
- session->write(processFlowFile, [&message, &write_status](const
std::shared_ptr<io::OutputStream>& stream) -> int64_t {
- if (message->payloadlen < 0) {
- write_status = -1;
- return -1;
- }
- const auto len =
stream->write(reinterpret_cast<uint8_t*>(message->payload),
gsl::narrow<size_t>(message->payloadlen));
- if (io::isError(len)) {
- write_status = -1;
- return -1;
- }
- return gsl::narrow<int64_t>(len);
- });
- if (write_status < 0) {
- logger_->log_error("ConsumeMQTT fail for the flow with UUID %s",
processFlowFile->getUUIDStr());
- session->remove(processFlowFile);
+ std::shared_ptr<core::FlowFile> flow_file = session->create();
+ WriteCallback write_callback(message, logger_);
+ try {
+ session->write(flow_file, write_callback);
+ } catch (const Exception& ex) {
+ logger_->log_error("Error when processing message queue: %s", ex.what());
+ }
+ if (!write_callback.getSuccessStatus()) {
+ logger_->log_error("ConsumeMQTT fail for the flow with UUID %s",
flow_file->getUUIDStr());
+ session->remove(flow_file);
} else {
- session->putAttribute(processFlowFile, MQTT_BROKER_ATTRIBUTE, uri_);
- session->putAttribute(processFlowFile, MQTT_TOPIC_ATTRIBUTE, topic_);
- logger_->log_debug("ConsumeMQTT processing success for the flow with
UUID %s topic %s", processFlowFile->getUUIDStr(), topic_);
- session->transfer(processFlowFile, Success);
+ putUserPropertiesAsAttributes(message, flow_file, session);
+ session->putAttribute(flow_file, MQTT_BROKER_ATTRIBUTE, uri_);
+ session->putAttribute(flow_file, MQTT_TOPIC_ATTRIBUTE, message.topic);
+ fillAttributeFromContentType(message, flow_file, session);
+ logger_->log_debug("ConsumeMQTT processing success for the flow with
UUID %s topic %s", flow_file->getUUIDStr(), message.topic);
+ session->transfer(flow_file, Success);
}
- msg_queue.pop_front();
+ msg_queue.pop();
+ }
+}
+
+std::queue<ConsumeMQTT::SmartMessage> ConsumeMQTT::getReceivedMqttMessages() {
+ std::queue<SmartMessage> msg_queue;
+ SmartMessage message;
+ while (queue_.try_dequeue(message)) {
+ msg_queue.push(std::move(message));
+ }
+ return msg_queue;
+}
+
+int64_t ConsumeMQTT::WriteCallback::operator() (const
std::shared_ptr<io::OutputStream>& stream) {
+ if (message_.contents->payloadlen < 0) {
+ success_status_ = false;
+ logger_->log_error("Payload length of message is negative, value is [%d]",
message_.contents->payloadlen);
+ return -1;
+ }
+
+ const auto len =
stream->write(reinterpret_cast<uint8_t*>(message_.contents->payload),
gsl::narrow<size_t>(message_.contents->payloadlen));
+ if (io::isError(len)) {
+ success_status_ = false;
+ logger_->log_error("Stream writing error when processing message");
+ return -1;
+ }
+
+ return len;
+}
+
+void ConsumeMQTT::putUserPropertiesAsAttributes(const SmartMessage& message,
const std::shared_ptr<core::FlowFile>& flow_file, const
std::shared_ptr<core::ProcessSession>& session) const {
+ if (mqtt_version_.value() != MqttVersions::V_5_0) {
+ return;
}
+
+ const auto property_count =
MQTTProperties_propertyCount(&message.contents->properties,
MQTTPROPERTY_CODE_USER_PROPERTY);
+ for (int i=0; i < property_count; ++i) {
+ MQTTProperty* property =
MQTTProperties_getPropertyAt(&message.contents->properties,
MQTTPROPERTY_CODE_USER_PROPERTY, i);
+ std::string key(property->value.data.data, property->value.data.len);
+ std::string value(property->value.value.data, property->value.value.len);
+ session->putAttribute(flow_file, key, value);
+ }
+}
+
+void ConsumeMQTT::fillAttributeFromContentType(const SmartMessage& message,
const std::shared_ptr<core::FlowFile>& flow_file, const
std::shared_ptr<core::ProcessSession>& session) const {
+ if (mqtt_version_.value() != MqttVersions::V_5_0 ||
attribute_from_content_type_.empty()) {
+ return;
+ }
+
+ MQTTProperty* property =
MQTTProperties_getProperty(&message.contents->properties,
MQTTPROPERTY_CODE_CONTENT_TYPE);
+ if (property == nullptr) {
+ return;
+ }
+
+ std::string content_type(property->value.data.data,
property->value.data.len);
+ session->putAttribute(flow_file, attribute_from_content_type_, content_type);
}
-bool ConsumeMQTT::startupClient() {
+void ConsumeMQTT::startupClient() {
MQTTAsync_responseOptions response_options =
MQTTAsync_responseOptions_initializer;
response_options.context = this;
- response_options.onSuccess = subscriptionSuccess;
- response_options.onFailure = subscriptionFailure;
- const int ret = MQTTAsync_subscribe(client_, topic_.c_str(),
gsl::narrow<int>(qos_), &response_options);
+
+ if (mqtt_version_.value() == MqttVersions::V_5_0) {
+ response_options.onSuccess5 = subscriptionSuccess5;
+ response_options.onFailure5 = subscriptionFailure5;
+ } else {
+ response_options.onSuccess = subscriptionSuccess;
+ response_options.onFailure = subscriptionFailure;
+ }
+
+ const int ret = MQTTAsync_subscribe(client_, topic_.c_str(),
gsl::narrow<int>(qos_.value()), &response_options);
if (ret != MQTTASYNC_SUCCESS) {
logger_->log_error("Failed to subscribe to MQTT topic %s (%d)", topic_,
ret);
- return false;
+ return;
}
logger_->log_debug("Successfully subscribed to MQTT topic: %s", topic_);
- return true;
}
-void ConsumeMQTT::onMessageReceived(char* topic_name, int /*topic_len*/,
MQTTAsync_message* message) {
- MQTTAsync_free(topic_name);
+void ConsumeMQTT::onMessageReceived(SmartMessage smart_message) {
+ if (mqtt_version_ == MqttVersions::V_5_0) {
+ resolveTopicFromAlias(smart_message);
+ }
+
+ if (smart_message.topic.empty()) {
+ logger_->log_error("Received message without topic");
+ return;
+ }
+
+ enqueueReceivedMQTTMsg(std::move(smart_message));
+}
+
+void ConsumeMQTT::resolveTopicFromAlias(SmartMessage& smart_message) {
+ auto raw_alias =
MQTTProperties_getNumericValue(&smart_message.contents->properties,
MQTTPROPERTY_CODE_TOPIC_ALIAS);
+
+ std::optional<uint16_t> alias;
+ if (raw_alias != PAHO_MQTT_C_FAILURE_CODE) {
+ alias = gsl::narrow<uint16_t>(raw_alias);
+ }
+
+ auto& topic = smart_message.topic;
- const auto* msgPayload = reinterpret_cast<const char*>(message->payload);
- const size_t msgLen = message->payloadlen;
- const std::string messageText(msgPayload, msgLen);
- logger_->log_debug("Received message \"%s\" to MQTT topic %s on broker %s",
messageText, topic_, uri_);
+ if (alias.has_value()) {
+ if (*alias > topic_alias_maximum_) {
+ logger_->log_error("Broker does not respect client's Topic Alias
Maximum, sent a greater value: %" PRIu16 " > %" PRIu16, *alias,
topic_alias_maximum_);
+ return;
+ }
- std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> smartMessage(message);
- enqueueReceivedMQTTMsg(std::move(smartMessage));
+ // if topic is empty, this is just a usage of a previously stored alias
(look it up), otherwise a new one (store it)
+ if (topic.empty()) {
+ const auto iter = alias_to_topic_.find(*alias);
+ if (iter == alias_to_topic_.end()) {
+ logger_->log_error("Broker sent an alias that was not known to client
before: %" PRIu16, *alias);
+ } else {
+ topic = iter->second;
+ }
+ } else {
+ alias_to_topic_[*alias] = topic;
+ }
+ } else if (topic.empty()) {
+ logger_->log_error("Received message without topic and alias");
+ }
}
void ConsumeMQTT::checkProperties() {
- if (!cleanSession_ && clientID_.empty()) {
- throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Processor must have a Client
ID for durable (non-clean) sessions");
+ if (mqtt_version_ == MqttVersions::V_3_1_0 || mqtt_version_ ==
MqttVersions::V_3_1_1 || mqtt_version_ == MqttVersions::V_3X_AUTO) {
+ if (isPropertyExplicitlySet(CleanStart)) {
+ logger_->log_warn("MQTT 3.x specification does not support Clean Start.
Property is not used.");
+ }
+ if (isPropertyExplicitlySet(SessionExpiryInterval)) {
+ logger_->log_warn("MQTT 3.x specification does not support Session
Expiry Intervals. Property is not used.");
+ }
+ if (isPropertyExplicitlySet(AttributeFromContentType)) {
+ logger_->log_warn("MQTT 3.x specification does not support Content Types
and thus attributes cannot be created from them. Property is not used.");
+ }
+ if (isPropertyExplicitlySet(TopicAliasMaximum)) {
+ logger_->log_warn("MQTT 3.x specification does not support Topic Alias
Maximum. Property is not used.");
+ }
+ if (isPropertyExplicitlySet(ReceiveMaximum)) {
+ logger_->log_warn("MQTT 3.x specification does not support Receive
Maximum. Property is not used.");
+ }
+ }
+
+ if (mqtt_version_.value() == MqttVersions::V_5_0 &&
isPropertyExplicitlySet(CleanSession)) {
+ logger_->log_warn("MQTT 5.0 specification does not support Clean Session.
Property is not used.");
+ }
+
+ if (clientID_.empty()) {
+ if (mqtt_version_.value() == MqttVersions::V_5_0) {
+ if (session_expiry_interval_ > std::chrono::seconds(0)) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Processor must have a
Client ID for durable (Session Expiry Interval > 0) sessions");
+ }
+ } else if (!clean_session_) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Processor must have a
Client ID for durable (non-clean) sessions");
+ }
+ }
+
+ if (qos_ == MqttQoS::LEVEL_0) {
+ if (mqtt_version_.value() == MqttVersions::V_5_0) {
+ if (session_expiry_interval_ > std::chrono::seconds(0)) {
+ logger_->log_warn("Messages are not preserved during client
disconnection "
+ "by the broker when QoS is less than 1 for durable
(Session Expiry Interval > 0) sessions. Only subscriptions are preserved.");
+ }
+ } else if (!clean_session_) {
+ logger_->log_warn("Messages are not preserved during client
disconnection "
+ "by the broker when QoS is less than 1 for durable
(non-clean) sessions. Only subscriptions are preserved.");
+ }
+ }
+}
+
+void ConsumeMQTT::checkBrokerLimitsImpl() {
+ auto hasWildcards = [] (std::string_view topic) {
+ return std::any_of(topic.begin(), topic.end(), [] (const char ch) {return
ch == '+' || ch == '#';});
+ };
+
+ if (wildcard_subscription_available_ == false && hasWildcards(topic_)) {
+ std::ostringstream os;
+ os << "Broker does not support wildcards but topic \"" << topic_ <<"\" has
them";
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, os.str());
+ }
+
+ if (maximum_session_expiry_interval_.has_value() && session_expiry_interval_
> maximum_session_expiry_interval_) {
+ std::ostringstream os;
+ os << "Set Session Expiry Interval (" << session_expiry_interval_.count()
<<" s) is longer then maximum supported by broker (" <<
maximum_session_expiry_interval_->count() << " s).";
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, os.str());
+ }
+
+ if (utils::StringUtils::startsWith(topic_, "$share/")) {
+ if (mqtt_version_.value() == MqttVersions::V_5_0) {
+ // shared topic are supported on MQTT 5, unless explicitly denied by
broker
+ if (shared_subscription_available_.has_value() &&
!*shared_subscription_available_) {
Review Comment:
only a suggestion, but this would be more readable to me:
```suggestion
if (shared_subscription_available_ == false) {
```
##########
extensions/mqtt/processors/ConsumeMQTT.cpp:
##########
@@ -34,111 +34,333 @@ void ConsumeMQTT::initialize() {
setSupportedRelationships(relationships());
}
-void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr<MQTTAsync_message,
MQTTMessageDeleter> message) {
- if (queue_.size_approx() >= maxQueueSize_) {
- logger_->log_warn("MQTT queue full");
+void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) {
+ if (queue_.size_approx() >= max_queue_size_) {
+ logger_->log_error("MQTT queue full");
return;
}
- if (gsl::narrow<uint64_t>(message->payloadlen) > max_seg_size_) {
- logger_->log_debug("MQTT message was truncated while enqueuing, original
length: %d", message->payloadlen);
- message->payloadlen = gsl::narrow<int>(max_seg_size_);
- }
-
- logger_->log_debug("enqueuing MQTT message with length %d",
message->payloadlen);
+ logger_->log_debug("enqueuing MQTT message with length %d",
message.contents->payloadlen);
queue_.enqueue(std::move(message));
}
-void ConsumeMQTT::onSchedule(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+void ConsumeMQTT::readProperties(const std::shared_ptr<core::ProcessContext>&
context) {
+ if (auto value = context->getProperty(Topic)) {
+ topic_ = std::move(*value);
+ }
+ logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_);
+
if (const auto value = context->getProperty<bool>(CleanSession)) {
- cleanSession_ = *value;
- logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+ clean_session_ = *value;
}
+ logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_);
+
+ if (const auto value = context->getProperty<bool>(CleanStart)) {
+ clean_start_ = *value;
+ }
+ logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_);
+
+ if (const auto session_expiry_interval =
context->getProperty<core::TimePeriodValue>(SessionExpiryInterval)) {
+ session_expiry_interval_ =
std::chrono::duration_cast<std::chrono::seconds>(session_expiry_interval->getMilliseconds());
+ }
+ logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s",
int64_t{session_expiry_interval_.count()});
if (const auto value =
context->getProperty<uint64_t>(QueueBufferMaxMessage)) {
- maxQueueSize_ = *value;
- logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]",
maxQueueSize_);
+ max_queue_size_ = *value;
}
+ logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]",
max_queue_size_);
- // this connects to broker, so properties of this processor must be read
before
- AbstractMQTTProcessor::onSchedule(context, factory);
-}
+ if (auto value = context->getProperty(AttributeFromContentType)) {
+ attribute_from_content_type_ = std::move(*value);
+ }
+ logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]",
attribute_from_content_type_);
-void ConsumeMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>&
/*context*/, const std::shared_ptr<core::ProcessSession> &session) {
- // reconnect if needed
- reconnect();
+ if (const auto topic_alias_maximum =
context->getProperty<uint32_t>(TopicAliasMaximum)) {
+ topic_alias_maximum_ = gsl::narrow<uint16_t>(*topic_alias_maximum);
+ }
+ logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]",
topic_alias_maximum_);
- if (!MQTTAsync_isConnected(client_)) {
- logger_->log_error("Could not consume from MQTT broker because
disconnected to %s", uri_);
- yield();
- return;
+ if (const auto receive_maximum =
context->getProperty<uint32_t>(ReceiveMaximum)) {
+ receive_maximum_ = gsl::narrow<uint16_t>(*receive_maximum);
}
+ logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]",
receive_maximum_);
+}
- std::deque<std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter>> msg_queue;
- getReceivedMQTTMsg(msg_queue);
+void ConsumeMQTT::onTriggerImpl(const std::shared_ptr<core::ProcessContext>&
/*context*/, const std::shared_ptr<core::ProcessSession>& session) {
+ std::queue<SmartMessage> msg_queue = getReceivedMqttMessages();
while (!msg_queue.empty()) {
const auto& message = msg_queue.front();
- std::shared_ptr<core::FlowFile> processFlowFile = session->create();
- int write_status{};
- session->write(processFlowFile, [&message, &write_status](const
std::shared_ptr<io::OutputStream>& stream) -> int64_t {
- if (message->payloadlen < 0) {
- write_status = -1;
- return -1;
- }
- const auto len =
stream->write(reinterpret_cast<uint8_t*>(message->payload),
gsl::narrow<size_t>(message->payloadlen));
- if (io::isError(len)) {
- write_status = -1;
- return -1;
- }
- return gsl::narrow<int64_t>(len);
- });
- if (write_status < 0) {
- logger_->log_error("ConsumeMQTT fail for the flow with UUID %s",
processFlowFile->getUUIDStr());
- session->remove(processFlowFile);
+ std::shared_ptr<core::FlowFile> flow_file = session->create();
+ WriteCallback write_callback(message, logger_);
+ try {
+ session->write(flow_file, write_callback);
+ } catch (const Exception& ex) {
+ logger_->log_error("Error when processing message queue: %s", ex.what());
+ }
+ if (!write_callback.getSuccessStatus()) {
+ logger_->log_error("ConsumeMQTT fail for the flow with UUID %s",
flow_file->getUUIDStr());
+ session->remove(flow_file);
} else {
- session->putAttribute(processFlowFile, MQTT_BROKER_ATTRIBUTE, uri_);
- session->putAttribute(processFlowFile, MQTT_TOPIC_ATTRIBUTE, topic_);
- logger_->log_debug("ConsumeMQTT processing success for the flow with
UUID %s topic %s", processFlowFile->getUUIDStr(), topic_);
- session->transfer(processFlowFile, Success);
+ putUserPropertiesAsAttributes(message, flow_file, session);
+ session->putAttribute(flow_file, MQTT_BROKER_ATTRIBUTE, uri_);
+ session->putAttribute(flow_file, MQTT_TOPIC_ATTRIBUTE, message.topic);
+ fillAttributeFromContentType(message, flow_file, session);
+ logger_->log_debug("ConsumeMQTT processing success for the flow with
UUID %s topic %s", flow_file->getUUIDStr(), message.topic);
+ session->transfer(flow_file, Success);
}
- msg_queue.pop_front();
+ msg_queue.pop();
+ }
+}
+
+std::queue<ConsumeMQTT::SmartMessage> ConsumeMQTT::getReceivedMqttMessages() {
+ std::queue<SmartMessage> msg_queue;
+ SmartMessage message;
+ while (queue_.try_dequeue(message)) {
+ msg_queue.push(std::move(message));
+ }
+ return msg_queue;
+}
+
+int64_t ConsumeMQTT::WriteCallback::operator() (const
std::shared_ptr<io::OutputStream>& stream) {
+ if (message_.contents->payloadlen < 0) {
+ success_status_ = false;
+ logger_->log_error("Payload length of message is negative, value is [%d]",
message_.contents->payloadlen);
+ return -1;
+ }
+
+ const auto len =
stream->write(reinterpret_cast<uint8_t*>(message_.contents->payload),
gsl::narrow<size_t>(message_.contents->payloadlen));
+ if (io::isError(len)) {
+ success_status_ = false;
+ logger_->log_error("Stream writing error when processing message");
+ return -1;
+ }
+
+ return len;
+}
+
+void ConsumeMQTT::putUserPropertiesAsAttributes(const SmartMessage& message,
const std::shared_ptr<core::FlowFile>& flow_file, const
std::shared_ptr<core::ProcessSession>& session) const {
+ if (mqtt_version_.value() != MqttVersions::V_5_0) {
+ return;
}
+
+ const auto property_count =
MQTTProperties_propertyCount(&message.contents->properties,
MQTTPROPERTY_CODE_USER_PROPERTY);
+ for (int i=0; i < property_count; ++i) {
+ MQTTProperty* property =
MQTTProperties_getPropertyAt(&message.contents->properties,
MQTTPROPERTY_CODE_USER_PROPERTY, i);
+ std::string key(property->value.data.data, property->value.data.len);
+ std::string value(property->value.value.data, property->value.value.len);
+ session->putAttribute(flow_file, key, value);
+ }
+}
+
+void ConsumeMQTT::fillAttributeFromContentType(const SmartMessage& message,
const std::shared_ptr<core::FlowFile>& flow_file, const
std::shared_ptr<core::ProcessSession>& session) const {
+ if (mqtt_version_.value() != MqttVersions::V_5_0 ||
attribute_from_content_type_.empty()) {
+ return;
+ }
+
+ MQTTProperty* property =
MQTTProperties_getProperty(&message.contents->properties,
MQTTPROPERTY_CODE_CONTENT_TYPE);
+ if (property == nullptr) {
+ return;
+ }
+
+ std::string content_type(property->value.data.data,
property->value.data.len);
+ session->putAttribute(flow_file, attribute_from_content_type_, content_type);
}
-bool ConsumeMQTT::startupClient() {
+void ConsumeMQTT::startupClient() {
MQTTAsync_responseOptions response_options =
MQTTAsync_responseOptions_initializer;
response_options.context = this;
- response_options.onSuccess = subscriptionSuccess;
- response_options.onFailure = subscriptionFailure;
- const int ret = MQTTAsync_subscribe(client_, topic_.c_str(),
gsl::narrow<int>(qos_), &response_options);
+
+ if (mqtt_version_.value() == MqttVersions::V_5_0) {
+ response_options.onSuccess5 = subscriptionSuccess5;
+ response_options.onFailure5 = subscriptionFailure5;
+ } else {
+ response_options.onSuccess = subscriptionSuccess;
+ response_options.onFailure = subscriptionFailure;
+ }
+
+ const int ret = MQTTAsync_subscribe(client_, topic_.c_str(),
gsl::narrow<int>(qos_.value()), &response_options);
if (ret != MQTTASYNC_SUCCESS) {
logger_->log_error("Failed to subscribe to MQTT topic %s (%d)", topic_,
ret);
- return false;
+ return;
}
logger_->log_debug("Successfully subscribed to MQTT topic: %s", topic_);
- return true;
}
-void ConsumeMQTT::onMessageReceived(char* topic_name, int /*topic_len*/,
MQTTAsync_message* message) {
- MQTTAsync_free(topic_name);
+void ConsumeMQTT::onMessageReceived(SmartMessage smart_message) {
+ if (mqtt_version_ == MqttVersions::V_5_0) {
+ resolveTopicFromAlias(smart_message);
+ }
+
+ if (smart_message.topic.empty()) {
+ logger_->log_error("Received message without topic");
+ return;
+ }
+
+ enqueueReceivedMQTTMsg(std::move(smart_message));
+}
+
+void ConsumeMQTT::resolveTopicFromAlias(SmartMessage& smart_message) {
+ auto raw_alias =
MQTTProperties_getNumericValue(&smart_message.contents->properties,
MQTTPROPERTY_CODE_TOPIC_ALIAS);
+
+ std::optional<uint16_t> alias;
+ if (raw_alias != PAHO_MQTT_C_FAILURE_CODE) {
+ alias = gsl::narrow<uint16_t>(raw_alias);
+ }
+
+ auto& topic = smart_message.topic;
- const auto* msgPayload = reinterpret_cast<const char*>(message->payload);
- const size_t msgLen = message->payloadlen;
- const std::string messageText(msgPayload, msgLen);
- logger_->log_debug("Received message \"%s\" to MQTT topic %s on broker %s",
messageText, topic_, uri_);
+ if (alias.has_value()) {
+ if (*alias > topic_alias_maximum_) {
+ logger_->log_error("Broker does not respect client's Topic Alias
Maximum, sent a greater value: %" PRIu16 " > %" PRIu16, *alias,
topic_alias_maximum_);
+ return;
+ }
- std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> smartMessage(message);
- enqueueReceivedMQTTMsg(std::move(smartMessage));
+ // if topic is empty, this is just a usage of a previously stored alias
(look it up), otherwise a new one (store it)
+ if (topic.empty()) {
+ const auto iter = alias_to_topic_.find(*alias);
+ if (iter == alias_to_topic_.end()) {
+ logger_->log_error("Broker sent an alias that was not known to client
before: %" PRIu16, *alias);
+ } else {
+ topic = iter->second;
+ }
+ } else {
+ alias_to_topic_[*alias] = topic;
+ }
+ } else if (topic.empty()) {
+ logger_->log_error("Received message without topic and alias");
+ }
}
void ConsumeMQTT::checkProperties() {
- if (!cleanSession_ && clientID_.empty()) {
- throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Processor must have a Client
ID for durable (non-clean) sessions");
+ if (mqtt_version_ == MqttVersions::V_3_1_0 || mqtt_version_ ==
MqttVersions::V_3_1_1 || mqtt_version_ == MqttVersions::V_3X_AUTO) {
+ if (isPropertyExplicitlySet(CleanStart)) {
+ logger_->log_warn("MQTT 3.x specification does not support Clean Start.
Property is not used.");
+ }
+ if (isPropertyExplicitlySet(SessionExpiryInterval)) {
+ logger_->log_warn("MQTT 3.x specification does not support Session
Expiry Intervals. Property is not used.");
+ }
+ if (isPropertyExplicitlySet(AttributeFromContentType)) {
+ logger_->log_warn("MQTT 3.x specification does not support Content Types
and thus attributes cannot be created from them. Property is not used.");
+ }
+ if (isPropertyExplicitlySet(TopicAliasMaximum)) {
+ logger_->log_warn("MQTT 3.x specification does not support Topic Alias
Maximum. Property is not used.");
+ }
+ if (isPropertyExplicitlySet(ReceiveMaximum)) {
+ logger_->log_warn("MQTT 3.x specification does not support Receive
Maximum. Property is not used.");
+ }
+ }
+
+ if (mqtt_version_.value() == MqttVersions::V_5_0 &&
isPropertyExplicitlySet(CleanSession)) {
+ logger_->log_warn("MQTT 5.0 specification does not support Clean Session.
Property is not used.");
+ }
+
+ if (clientID_.empty()) {
+ if (mqtt_version_.value() == MqttVersions::V_5_0) {
+ if (session_expiry_interval_ > std::chrono::seconds(0)) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Processor must have a
Client ID for durable (Session Expiry Interval > 0) sessions");
+ }
+ } else if (!clean_session_) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Processor must have a
Client ID for durable (non-clean) sessions");
+ }
+ }
+
+ if (qos_ == MqttQoS::LEVEL_0) {
+ if (mqtt_version_.value() == MqttVersions::V_5_0) {
+ if (session_expiry_interval_ > std::chrono::seconds(0)) {
+ logger_->log_warn("Messages are not preserved during client
disconnection "
+ "by the broker when QoS is less than 1 for durable
(Session Expiry Interval > 0) sessions. Only subscriptions are preserved.");
+ }
+ } else if (!clean_session_) {
+ logger_->log_warn("Messages are not preserved during client
disconnection "
+ "by the broker when QoS is less than 1 for durable
(non-clean) sessions. Only subscriptions are preserved.");
+ }
+ }
+}
+
+void ConsumeMQTT::checkBrokerLimitsImpl() {
+ auto hasWildcards = [] (std::string_view topic) {
+ return std::any_of(topic.begin(), topic.end(), [] (const char ch) {return
ch == '+' || ch == '#';});
+ };
+
+ if (wildcard_subscription_available_ == false && hasWildcards(topic_)) {
+ std::ostringstream os;
+ os << "Broker does not support wildcards but topic \"" << topic_ <<"\" has
them";
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, os.str());
+ }
+
+ if (maximum_session_expiry_interval_.has_value() && session_expiry_interval_
> maximum_session_expiry_interval_) {
+ std::ostringstream os;
+ os << "Set Session Expiry Interval (" << session_expiry_interval_.count()
<<" s) is longer then maximum supported by broker (" <<
maximum_session_expiry_interval_->count() << " s).";
Review Comment:
nitpicking:
```suggestion
os << "Set Session Expiry Interval (" <<
session_expiry_interval_.count() <<" s) is longer than the maximum supported by
the broker (" << maximum_session_expiry_interval_->count() << " s).";
```
##########
PROCESSORS.md:
##########
@@ -2364,6 +2380,8 @@ In the list below, the names of required properties
appear in bold. Any other pr
### Description
Routes FlowFiles based on their Attributes using the Attribute Expression
Language.
+Any number of user-defined dynamic properties can be added, which all support
the Attribute Expression Language. Relationships matching the name of the
properties will be added.
+FlowFiles will be routed to all the relationships whose matching property
evaluates to "true". Unmatched FlowFiles will be routed for "unmatched"
relationship, while failed ones to "failure".
Review Comment:
very minor, but:
```suggestion
FlowFiles will be routed to all the relationships whose matching property
evaluates to "true". Unmatched FlowFiles will be routed to the "unmatched"
relationship, while failed ones to "failure".
```
##########
extensions/mqtt/processors/AbstractMQTTProcessor.h:
##########
@@ -46,150 +44,178 @@ class AbstractMQTTProcessor : public core::Processor {
freeResources();
}
+ SMART_ENUM(MqttVersions,
+ (V_3X_AUTO, "3.x AUTO"),
+ (V_3_1_0, "3.1.0"),
+ (V_3_1_1, "3.1.1"),
+ (V_5_0, "5.0"));
+
+ SMART_ENUM(MqttQoS,
+ (LEVEL_0, "0"),
+ (LEVEL_1, "1"),
+ (LEVEL_2, "2"));
+
EXTENSIONAPI static const core::Property BrokerURI;
EXTENSIONAPI static const core::Property ClientID;
+ EXTENSIONAPI static const core::Property QoS;
+ EXTENSIONAPI static const core::Property MqttVersion;
+ EXTENSIONAPI static const core::Property ConnectionTimeout;
+ EXTENSIONAPI static const core::Property KeepAliveInterval;
+ EXTENSIONAPI static const core::Property LastWillTopic;
+ EXTENSIONAPI static const core::Property LastWillMessage;
+ EXTENSIONAPI static const core::Property LastWillQoS;
+ EXTENSIONAPI static const core::Property LastWillRetain;
+ EXTENSIONAPI static const core::Property LastWillContentType;
EXTENSIONAPI static const core::Property Username;
EXTENSIONAPI static const core::Property Password;
- EXTENSIONAPI static const core::Property KeepAliveInterval;
- EXTENSIONAPI static const core::Property MaxFlowSegSize;
- EXTENSIONAPI static const core::Property ConnectionTimeout;
- EXTENSIONAPI static const core::Property Topic;
- EXTENSIONAPI static const core::Property QoS;
EXTENSIONAPI static const core::Property SecurityProtocol;
EXTENSIONAPI static const core::Property SecurityCA;
EXTENSIONAPI static const core::Property SecurityCert;
EXTENSIONAPI static const core::Property SecurityPrivateKey;
EXTENSIONAPI static const core::Property SecurityPrivateKeyPassword;
- EXTENSIONAPI static const core::Property LastWillTopic;
- EXTENSIONAPI static const core::Property LastWillMessage;
- EXTENSIONAPI static const core::Property LastWillQoS;
- EXTENSIONAPI static const core::Property LastWillRetain;
- EXTENSIONAPI static auto properties() {
+
+ static auto basicProperties() {
+ return std::array{
+ BrokerURI,
+ ClientID,
+ MqttVersion
+ };
+ }
+
+ static auto advancedProperties() {
return std::array{
- BrokerURI,
- Topic,
- ClientID,
- QoS,
- ConnectionTimeout,
- KeepAliveInterval,
- MaxFlowSegSize,
- LastWillTopic,
- LastWillMessage,
- LastWillQoS,
- LastWillRetain,
- Username,
- Password,
- SecurityProtocol,
- SecurityCA,
- SecurityCert,
- SecurityPrivateKey,
- SecurityPrivateKeyPassword
+ QoS,
+ ConnectionTimeout,
+ KeepAliveInterval,
+ LastWillTopic,
+ LastWillMessage,
+ LastWillQoS,
+ LastWillRetain,
+ LastWillContentType,
+ Username,
+ Password,
+ SecurityProtocol,
+ SecurityCA,
+ SecurityCert,
+ SecurityPrivateKey,
+ SecurityPrivateKeyPassword
};
}
void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const
std::shared_ptr<core::ProcessSessionFactory>& factory) override;
+ void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const
std::shared_ptr<core::ProcessSession>& session) override;
void notifyStop() override {
freeResources();
}
protected:
+ struct MQTTMessageDeleter {
+ void operator()(MQTTAsync_message* message) {
+ MQTTAsync_freeMessage(&message);
Review Comment:
I don't think the `&` is needed here
##########
extensions/mqtt/processors/PublishMQTT.cpp:
##########
@@ -34,76 +34,229 @@ void PublishMQTT::initialize() {
setSupportedRelationships(relationships());
}
-void PublishMQTT::onSchedule(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+void PublishMQTT::readProperties(const std::shared_ptr<core::ProcessContext>&
context) {
+ if (!context->getProperty(Topic).has_value()) {
+ logger_->log_error("PublishMQTT: could not get Topic");
+ }
Review Comment:
How can the `Topic` property become valid later? If it cannot, then
throwing here instead of in `onTrigger()` would make more sense. (And if it
can, then this is not an error.)
##########
libminifi/include/utils/Enum.h:
##########
@@ -127,7 +128,7 @@ namespace utils {
#define SMART_ENUM(Clazz, ...) \
struct Clazz { \
using Base = ::org::apache::nifi::minifi::utils::EnumBase; \
- enum Type { \
+ enum Type : int { \
Review Comment:
I'm not against this, but what is the reason for it?
##########
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##########
@@ -163,23 +200,259 @@ void AbstractMQTTProcessor::reconnect() {
}
logger_->log_info("Reconnecting to %s", uri_);
- int ret = MQTTAsync_connect(client_, &conn_opts);
+ if (MQTTAsync_isConnected(client_)) {
+ logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
+ return;
+ }
+ const int ret = MQTTAsync_connect(client_, &conn_opts);
+ MQTTProperties_free(&connect_props);
if (ret != MQTTASYNC_SUCCESS) {
- logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_,
ret);
+ logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error
code [%d]", uri_, ret);
+ return;
}
+
+ // wait until connection succeeds or fails
+ connect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions&
conn_opts, MQTTProperties& connect_props, MQTTProperties& will_props) const {
+ conn_opts.cleanstart = getCleanStart();
+
+ {
+ MQTTProperty property;
+ property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
+ property.value.integer4 =
gsl::narrow<int>(getSessionExpiryInterval().count());
+ MQTTProperties_add(&connect_props, &property);
+ }
+
+ if (!last_will_content_type_.empty()) {
+ MQTTProperty property;
+ property.identifier = MQTTPROPERTY_CODE_CONTENT_TYPE;
+ property.value.data.len = last_will_content_type_.length();
+ property.value.data.data =
const_cast<char*>(last_will_content_type_.data());
+ MQTTProperties_add(&will_props, &property);
+ }
+
+ conn_opts.willProperties = &will_props;
+
+ setMqtt5ConnectOptionsImpl(connect_props);
+}
+
+void AbstractMQTTProcessor::onTrigger(const
std::shared_ptr<core::ProcessContext>& context, const
std::shared_ptr<core::ProcessSession>& session) {
+ // read lock
+ std::shared_lock client_lock{client_mutex_};
+ if (client_ == nullptr) {
+ // we are shutting down
+ return;
+ }
+
+ // reconnect if needed
+ reconnect();
+
+ if (!MQTTAsync_isConnected(client_)) {
+ logger_->log_error("Could not work with MQTT broker because disconnected
to %s", uri_);
+ yield();
+ return;
+ }
+
+ onTriggerImpl(context, session);
}
void AbstractMQTTProcessor::freeResources() {
- if (client_ && MQTTAsync_isConnected(client_)) {
- MQTTAsync_disconnectOptions disconnect_options =
MQTTAsync_disconnectOptions_initializer;
- disconnect_options.context = this;
- disconnect_options.onSuccess = disconnectionSuccess;
- disconnect_options.onFailure = disconnectionFailure;
- disconnect_options.timeout =
gsl::narrow<int>(std::chrono::milliseconds{connection_timeout_}.count());
- MQTTAsync_disconnect(client_, &disconnect_options);
+ // write lock
+ std::lock_guard client_lock{client_mutex_};
+
+ if (!client_) {
+ return;
}
- if (client_) {
- MQTTAsync_destroy(&client_);
+
+ disconnect();
+
+ MQTTAsync_destroy(&client_);
+}
+
+void AbstractMQTTProcessor::disconnect() {
+ if (!MQTTAsync_isConnected(client_)) {
+ return;
+ }
+
+ MQTTAsync_disconnectOptions disconnect_options =
MQTTAsync_disconnectOptions_initializer;
+ std::packaged_task<void(MQTTAsync_successData*, MQTTAsync_successData5*,
MQTTAsync_failureData*, MQTTAsync_failureData5*)> disconnect_finished_task(
+ [this] (MQTTAsync_successData* success_data, MQTTAsync_successData5*
success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5*
failure_data_5) {
+ onDisconnectFinished(success_data, success_data_5, failure_data,
failure_data_5);
+ });
+ disconnect_options.context = &disconnect_finished_task;
+
+ if (mqtt_version_.value() == MqttVersions::V_5_0) {
+ disconnect_options.onSuccess5 = connectionSuccess5;
+ disconnect_options.onFailure5 = connectionFailure5;
+ } else {
+ disconnect_options.onSuccess = connectionSuccess;
+ disconnect_options.onFailure = connectionFailure;
+ }
+
+ disconnect_options.timeout =
gsl::narrow<int>(std::chrono::milliseconds{connection_timeout_}.count());
+
+ const int ret = MQTTAsync_disconnect(client_, &disconnect_options);
+ if (ret != MQTTASYNC_SUCCESS) {
+ logger_->log_error("MQTTAsync_disconnect failed to MQTT broker %s with
error code [%d]", uri_, ret);
+ return;
+ }
+
+ // wait until connection succeeds or fails
+ disconnect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setBrokerLimits(MQTTAsync_successData5* response) {
+ auto readProperty = [response] (MQTTPropertyCodes property_code, auto&
out_var) {
+ const int value = MQTTProperties_getNumericValue(&response->properties,
property_code);
+ if (value != PAHO_MQTT_C_FAILURE_CODE) {
+ if constexpr (std::is_same_v<decltype(out_var),
std::optional<std::chrono::seconds>&>) {
+ out_var = std::chrono::seconds(value);
+ } else {
+ out_var = gsl::narrow<typename
std::remove_reference_t<decltype(out_var)>::value_type>(value);
+ }
+ } else {
+ out_var.reset();
+ }
+ };
+
+ readProperty(MQTTPROPERTY_CODE_RETAIN_AVAILABLE, retain_available_);
+ readProperty(MQTTPROPERTY_CODE_WILDCARD_SUBSCRIPTION_AVAILABLE,
wildcard_subscription_available_);
+ readProperty(MQTTPROPERTY_CODE_SHARED_SUBSCRIPTION_AVAILABLE,
shared_subscription_available_);
+
+ readProperty(MQTTPROPERTY_CODE_TOPIC_ALIAS_MAXIMUM,
broker_topic_alias_maximum_);
+ readProperty(MQTTPROPERTY_CODE_RECEIVE_MAXIMUM, broker_receive_maximum_);
+ readProperty(MQTTPROPERTY_CODE_MAXIMUM_QOS, maximum_qos_);
+ readProperty(MQTTPROPERTY_CODE_MAXIMUM_PACKET_SIZE, maximum_packet_size_);
+
+ readProperty(MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL,
maximum_session_expiry_interval_);
+ readProperty(MQTTPROPERTY_CODE_SERVER_KEEP_ALIVE, server_keep_alive_);
+}
+
+void AbstractMQTTProcessor::checkBrokerLimits() {
+ try {
+ if (server_keep_alive_.has_value() && server_keep_alive_ <
keep_alive_interval_) {
+ std::ostringstream os;
+ os << "Set Keep Alive Interval (" << keep_alive_interval_.count() << "
s) is longer then maximum supported by broker (" << server_keep_alive_->count()
<< " s)";
+ throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION,
os.str());
+ }
+
+ if (maximum_qos_.has_value() && qos_.value() > maximum_qos_) {
+ std::ostringstream os;
+ os << "Set QoS (" << qos_.value() << ") is higher than maximum supported
by broker (" << *maximum_qos_ << ")";
+ throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION,
os.str());
+ }
+
+ checkBrokerLimitsImpl();
+ }
+ catch (...) {
+ disconnect();
+ throw;
+ }
+}
+
+void AbstractMQTTProcessor::connectionLost(void *context, char* cause) {
+ auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
+ processor->onConnectionLost(cause);
+}
+
+
+void AbstractMQTTProcessor::connectionSuccess(void* context,
MQTTAsync_successData* response) {
+ auto* task =
reinterpret_cast<std::packaged_task<void(MQTTAsync_successData*,
MQTTAsync_successData5*, MQTTAsync_failureData*,
MQTTAsync_failureData5*)>*>(context);
Review Comment:
Check with @szaszm because I don't want you to change this and then have to
revert the change, but I think this `packaged_task<...>` type is long enough
and frequently-used enough to merit a typedef.
##########
extensions/mqtt/processors/ConsumeMQTT.cpp:
##########
@@ -34,111 +34,333 @@ void ConsumeMQTT::initialize() {
setSupportedRelationships(relationships());
}
-void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr<MQTTAsync_message,
MQTTMessageDeleter> message) {
- if (queue_.size_approx() >= maxQueueSize_) {
- logger_->log_warn("MQTT queue full");
+void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) {
+ if (queue_.size_approx() >= max_queue_size_) {
+ logger_->log_error("MQTT queue full");
return;
}
- if (gsl::narrow<uint64_t>(message->payloadlen) > max_seg_size_) {
- logger_->log_debug("MQTT message was truncated while enqueuing, original
length: %d", message->payloadlen);
- message->payloadlen = gsl::narrow<int>(max_seg_size_);
- }
-
- logger_->log_debug("enqueuing MQTT message with length %d",
message->payloadlen);
+ logger_->log_debug("enqueuing MQTT message with length %d",
message.contents->payloadlen);
queue_.enqueue(std::move(message));
}
-void ConsumeMQTT::onSchedule(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+void ConsumeMQTT::readProperties(const std::shared_ptr<core::ProcessContext>&
context) {
+ if (auto value = context->getProperty(Topic)) {
+ topic_ = std::move(*value);
+ }
+ logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_);
+
if (const auto value = context->getProperty<bool>(CleanSession)) {
- cleanSession_ = *value;
- logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+ clean_session_ = *value;
}
+ logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_);
+
+ if (const auto value = context->getProperty<bool>(CleanStart)) {
+ clean_start_ = *value;
+ }
+ logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_);
+
+ if (const auto session_expiry_interval =
context->getProperty<core::TimePeriodValue>(SessionExpiryInterval)) {
+ session_expiry_interval_ =
std::chrono::duration_cast<std::chrono::seconds>(session_expiry_interval->getMilliseconds());
+ }
+ logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s",
int64_t{session_expiry_interval_.count()});
if (const auto value =
context->getProperty<uint64_t>(QueueBufferMaxMessage)) {
- maxQueueSize_ = *value;
- logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]",
maxQueueSize_);
+ max_queue_size_ = *value;
}
+ logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]",
max_queue_size_);
- // this connects to broker, so properties of this processor must be read
before
- AbstractMQTTProcessor::onSchedule(context, factory);
-}
+ if (auto value = context->getProperty(AttributeFromContentType)) {
+ attribute_from_content_type_ = std::move(*value);
+ }
+ logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]",
attribute_from_content_type_);
-void ConsumeMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>&
/*context*/, const std::shared_ptr<core::ProcessSession> &session) {
- // reconnect if needed
- reconnect();
+ if (const auto topic_alias_maximum =
context->getProperty<uint32_t>(TopicAliasMaximum)) {
+ topic_alias_maximum_ = gsl::narrow<uint16_t>(*topic_alias_maximum);
+ }
+ logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]",
topic_alias_maximum_);
- if (!MQTTAsync_isConnected(client_)) {
- logger_->log_error("Could not consume from MQTT broker because
disconnected to %s", uri_);
- yield();
- return;
+ if (const auto receive_maximum =
context->getProperty<uint32_t>(ReceiveMaximum)) {
+ receive_maximum_ = gsl::narrow<uint16_t>(*receive_maximum);
}
+ logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]",
receive_maximum_);
+}
- std::deque<std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter>> msg_queue;
- getReceivedMQTTMsg(msg_queue);
+void ConsumeMQTT::onTriggerImpl(const std::shared_ptr<core::ProcessContext>&
/*context*/, const std::shared_ptr<core::ProcessSession>& session) {
+ std::queue<SmartMessage> msg_queue = getReceivedMqttMessages();
while (!msg_queue.empty()) {
const auto& message = msg_queue.front();
- std::shared_ptr<core::FlowFile> processFlowFile = session->create();
- int write_status{};
- session->write(processFlowFile, [&message, &write_status](const
std::shared_ptr<io::OutputStream>& stream) -> int64_t {
- if (message->payloadlen < 0) {
- write_status = -1;
- return -1;
- }
- const auto len =
stream->write(reinterpret_cast<uint8_t*>(message->payload),
gsl::narrow<size_t>(message->payloadlen));
- if (io::isError(len)) {
- write_status = -1;
- return -1;
- }
- return gsl::narrow<int64_t>(len);
- });
- if (write_status < 0) {
- logger_->log_error("ConsumeMQTT fail for the flow with UUID %s",
processFlowFile->getUUIDStr());
- session->remove(processFlowFile);
+ std::shared_ptr<core::FlowFile> flow_file = session->create();
+ WriteCallback write_callback(message, logger_);
+ try {
+ session->write(flow_file, write_callback);
+ } catch (const Exception& ex) {
+ logger_->log_error("Error when processing message queue: %s", ex.what());
+ }
+ if (!write_callback.getSuccessStatus()) {
+ logger_->log_error("ConsumeMQTT fail for the flow with UUID %s",
flow_file->getUUIDStr());
+ session->remove(flow_file);
} else {
- session->putAttribute(processFlowFile, MQTT_BROKER_ATTRIBUTE, uri_);
- session->putAttribute(processFlowFile, MQTT_TOPIC_ATTRIBUTE, topic_);
- logger_->log_debug("ConsumeMQTT processing success for the flow with
UUID %s topic %s", processFlowFile->getUUIDStr(), topic_);
- session->transfer(processFlowFile, Success);
+ putUserPropertiesAsAttributes(message, flow_file, session);
+ session->putAttribute(flow_file, MQTT_BROKER_ATTRIBUTE, uri_);
+ session->putAttribute(flow_file, MQTT_TOPIC_ATTRIBUTE, message.topic);
+ fillAttributeFromContentType(message, flow_file, session);
+ logger_->log_debug("ConsumeMQTT processing success for the flow with
UUID %s topic %s", flow_file->getUUIDStr(), message.topic);
+ session->transfer(flow_file, Success);
}
- msg_queue.pop_front();
+ msg_queue.pop();
+ }
+}
+
+std::queue<ConsumeMQTT::SmartMessage> ConsumeMQTT::getReceivedMqttMessages() {
+ std::queue<SmartMessage> msg_queue;
+ SmartMessage message;
+ while (queue_.try_dequeue(message)) {
+ msg_queue.push(std::move(message));
+ }
+ return msg_queue;
+}
+
+int64_t ConsumeMQTT::WriteCallback::operator() (const
std::shared_ptr<io::OutputStream>& stream) {
+ if (message_.contents->payloadlen < 0) {
+ success_status_ = false;
+ logger_->log_error("Payload length of message is negative, value is [%d]",
message_.contents->payloadlen);
+ return -1;
+ }
+
+ const auto len =
stream->write(reinterpret_cast<uint8_t*>(message_.contents->payload),
gsl::narrow<size_t>(message_.contents->payloadlen));
+ if (io::isError(len)) {
+ success_status_ = false;
+ logger_->log_error("Stream writing error when processing message");
+ return -1;
+ }
+
+ return len;
+}
+
+void ConsumeMQTT::putUserPropertiesAsAttributes(const SmartMessage& message,
const std::shared_ptr<core::FlowFile>& flow_file, const
std::shared_ptr<core::ProcessSession>& session) const {
+ if (mqtt_version_.value() != MqttVersions::V_5_0) {
+ return;
}
+
+ const auto property_count =
MQTTProperties_propertyCount(&message.contents->properties,
MQTTPROPERTY_CODE_USER_PROPERTY);
+ for (int i=0; i < property_count; ++i) {
+ MQTTProperty* property =
MQTTProperties_getPropertyAt(&message.contents->properties,
MQTTPROPERTY_CODE_USER_PROPERTY, i);
+ std::string key(property->value.data.data, property->value.data.len);
+ std::string value(property->value.value.data, property->value.value.len);
+ session->putAttribute(flow_file, key, value);
+ }
+}
+
+void ConsumeMQTT::fillAttributeFromContentType(const SmartMessage& message,
const std::shared_ptr<core::FlowFile>& flow_file, const
std::shared_ptr<core::ProcessSession>& session) const {
+ if (mqtt_version_.value() != MqttVersions::V_5_0 ||
attribute_from_content_type_.empty()) {
+ return;
+ }
+
+ MQTTProperty* property =
MQTTProperties_getProperty(&message.contents->properties,
MQTTPROPERTY_CODE_CONTENT_TYPE);
+ if (property == nullptr) {
+ return;
+ }
+
+ std::string content_type(property->value.data.data,
property->value.data.len);
+ session->putAttribute(flow_file, attribute_from_content_type_, content_type);
}
-bool ConsumeMQTT::startupClient() {
+void ConsumeMQTT::startupClient() {
MQTTAsync_responseOptions response_options =
MQTTAsync_responseOptions_initializer;
response_options.context = this;
- response_options.onSuccess = subscriptionSuccess;
- response_options.onFailure = subscriptionFailure;
- const int ret = MQTTAsync_subscribe(client_, topic_.c_str(),
gsl::narrow<int>(qos_), &response_options);
+
+ if (mqtt_version_.value() == MqttVersions::V_5_0) {
+ response_options.onSuccess5 = subscriptionSuccess5;
+ response_options.onFailure5 = subscriptionFailure5;
+ } else {
+ response_options.onSuccess = subscriptionSuccess;
+ response_options.onFailure = subscriptionFailure;
+ }
+
+ const int ret = MQTTAsync_subscribe(client_, topic_.c_str(),
gsl::narrow<int>(qos_.value()), &response_options);
if (ret != MQTTASYNC_SUCCESS) {
logger_->log_error("Failed to subscribe to MQTT topic %s (%d)", topic_,
ret);
- return false;
+ return;
}
logger_->log_debug("Successfully subscribed to MQTT topic: %s", topic_);
- return true;
}
-void ConsumeMQTT::onMessageReceived(char* topic_name, int /*topic_len*/,
MQTTAsync_message* message) {
- MQTTAsync_free(topic_name);
+void ConsumeMQTT::onMessageReceived(SmartMessage smart_message) {
+ if (mqtt_version_ == MqttVersions::V_5_0) {
+ resolveTopicFromAlias(smart_message);
+ }
+
+ if (smart_message.topic.empty()) {
+ logger_->log_error("Received message without topic");
+ return;
+ }
+
+ enqueueReceivedMQTTMsg(std::move(smart_message));
+}
+
+void ConsumeMQTT::resolveTopicFromAlias(SmartMessage& smart_message) {
+ auto raw_alias =
MQTTProperties_getNumericValue(&smart_message.contents->properties,
MQTTPROPERTY_CODE_TOPIC_ALIAS);
+
+ std::optional<uint16_t> alias;
+ if (raw_alias != PAHO_MQTT_C_FAILURE_CODE) {
+ alias = gsl::narrow<uint16_t>(raw_alias);
+ }
+
+ auto& topic = smart_message.topic;
- const auto* msgPayload = reinterpret_cast<const char*>(message->payload);
- const size_t msgLen = message->payloadlen;
- const std::string messageText(msgPayload, msgLen);
- logger_->log_debug("Received message \"%s\" to MQTT topic %s on broker %s",
messageText, topic_, uri_);
+ if (alias.has_value()) {
+ if (*alias > topic_alias_maximum_) {
Review Comment:
```suggestion
if (topic_alias_maximum_ != 0 && *alias > topic_alias_maximum_) {
```
?
##########
PROCESSORS.md:
##########
@@ -337,28 +337,34 @@ This Processor gets the contents of a FlowFile from a
MQTT broker for a specifie
In the list below, the names of required properties appear in bold. Any other
properties (not in bold) are considered optional. The table also indicates any
default values, and whether a property supports the NiFi Expression Language.
-| Name | Default Value | Allowable Values | Description
|
-|-----------------------|---------------|------------------|-----------------------------------------------------------------------------------------------------------------------------|
-| **Broker URI** | | | The URI to use to
connect to the MQTT broker
|
-| **Topic** | | | The topic to
subscribe to
|
-| Client ID | | | MQTT client ID to
use
|
-| Quality of Service | 0 | | The Quality of
Service (QoS) to receive the message with. Accepts three values '0', '1' and
'2' |
-| Connection Timeout | 30 sec | | Maximum time
interval the client will wait for the network connection to the MQTT broker
|
-| Keep Alive Interval | 60 sec | | Defines the
maximum time interval between messages being sent to the broker
|
-| Max Flow Segment Size | | | Maximum flow
content payload segment size for the MQTT record
|
-| Last Will Topic | | | The topic to send
the client's Last Will to. If the Last Will topic is not set then a Last Will
will not be sent |
-| Last Will Message | | | The message to
send as the client's Last Will. If the Last Will Message is empty, Last Will
will be deleted from the broker |
-| Last Will QoS | 0 | | The Quality of
Service (QoS) to send the last will with. Accepts three values '0', '1' and '2'
|
-| Last Will Retain | false | | Whether to retain
the client's Last Will
|
-| Security Protocol | | | Protocol used to
communicate with brokers
|
-| Security CA | | | File or directory
path to CA certificate(s) for verifying the broker's key
|
-| Security Cert | | | Path to client's
public key (PEM) used for authentication
|
-| Security Private Key | | | Path to client's
private key (PEM) used for authentication
|
-| Security Pass Phrase | | | Private key
passphrase
|
-| Username | | | Username to use
when connecting to the broker
|
-| Password | | | Password to use
when connecting to the broker
|
-| Clean Session | true | | Whether to start
afresh rather than remembering previous subscriptions
|
-| Queue Max Message | 1000 | | Maximum number of
messages allowed on the received MQTT queue
|
+| Name | Default Value | Allowable Values |
Description
|
+|-----------------------------|---------------|-----------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| **Broker URI** | | |
The URI to use to connect to the MQTT broker
|
+| Client ID | | |
MQTT client ID to use. WARNING: Must not be empty when using MQTT 3.1.0!
|
+| MQTT Version | 3.x AUTO | 3.x AUTO, 3.1.0, 3.1.1, 5.0 |
The MQTT specification version when connecting to the broker.
|
+| **Topic** | | |
The topic to subscribe to.
|
+| Clean Session | true | |
Whether to start afresh rather than remembering previous subscriptions. Also
make broker remember subscriptions after disconnected. WARNING: MQTT 3.x only. |
Review Comment:
the second sentence is not clear to me: is the broker going to remember
subscriptions after a disconnect when this is set to `true` or to `false`?
##########
extensions/mqtt/processors/PublishMQTT.cpp:
##########
@@ -34,76 +34,229 @@ void PublishMQTT::initialize() {
setSupportedRelationships(relationships());
}
-void PublishMQTT::onSchedule(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+void PublishMQTT::readProperties(const std::shared_ptr<core::ProcessContext>&
context) {
+ if (!context->getProperty(Topic).has_value()) {
+ logger_->log_error("PublishMQTT: could not get Topic");
+ }
+
if (const auto retain_opt = context->getProperty<bool>(Retain)) {
retain_ = *retain_opt;
}
logger_->log_debug("PublishMQTT: Retain [%d]", retain_);
- AbstractMQTTProcessor::onSchedule(context, factory);
+ if (const auto message_expiry_interval =
context->getProperty<core::TimePeriodValue>(MessageExpiryInterval)) {
+ message_expiry_interval_ =
std::chrono::duration_cast<std::chrono::seconds>(message_expiry_interval->getMilliseconds());
+ logger_->log_debug("PublishMQTT: MessageExpiryInterval [%" PRId64 "] s",
int64_t{message_expiry_interval_->count()});
+ }
+
+ in_flight_message_counter_.setMqttVersion(mqtt_version_);
+ in_flight_message_counter_.setQoS(qos_);
}
-void PublishMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>&
/*context*/, const std::shared_ptr<core::ProcessSession> &session) {
- // reconnect if needed
- reconnect();
+void PublishMQTT::onTriggerImpl(const std::shared_ptr<core::ProcessContext>&
context, const std::shared_ptr<core::ProcessSession> &session) {
+ std::shared_ptr<core::FlowFile> flow_file = session->get();
- if (!MQTTAsync_isConnected(client_)) {
- logger_->log_error("Could not publish to MQTT broker because disconnected
to %s", uri_);
- yield();
+ if (!flow_file) {
return;
}
- std::shared_ptr<core::FlowFile> flowFile = session->get();
+ // broker's Receive Maximum can change after reconnect
+
in_flight_message_counter_.setMax(broker_receive_maximum_.value_or(MQTT_MAX_RECEIVE_MAXIMUM));
- if (!flowFile) {
- return;
+ const auto topic = getTopic(context, flow_file);
+ try {
+ const auto result = session->readBuffer(flow_file);
+ if (result.status < 0 || !sendMessage(result.buffer, topic,
getContentType(context, flow_file), flow_file)) {
+ logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on
broker %s", flow_file->getUUIDStr(), topic, uri_);
+ session->transfer(flow_file, Failure);
+ return;
+ }
+ logger_->log_debug("Sent flow file [%s] with length %" PRId64 " to MQTT
topic '%s' on broker %s", flow_file->getUUIDStr(), result.status, topic, uri_);
+ session->transfer(flow_file, Success);
+ } catch (const Exception& ex) {
+ logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on
broker %s, exception string: '%s'", flow_file->getUUIDStr(), topic, uri_,
ex.what());
+ session->transfer(flow_file, Failure);
+ }
+}
+
+bool PublishMQTT::sendMessage(const std::vector<std::byte>& buffer, const
std::string& topic, const std::string& content_type, const
std::shared_ptr<core::FlowFile>& flow_file) {
+ if (buffer.size() > 268'435'455) {
+ logger_->log_error("Sending message failed because MQTT limit maximum
packet size [268'435'455] is exceeded by FlowFile of [%zu]", buffer.size());
}
- PublishMQTT::ReadCallback callback(this, flowFile->getSize(), max_seg_size_,
topic_, client_, gsl::narrow<int>(qos_), retain_);
- session->read(flowFile, std::ref(callback));
- if (callback.status_ < 0) {
- logger_->log_error("Failed to send flow to MQTT topic %s", topic_);
- session->transfer(flowFile, Failure);
+ if (maximum_packet_size_.has_value() && buffer.size() >
*(maximum_packet_size_)) {
+ logger_->log_error("Sending message failed because broker-requested
maximum packet size [%" PRIu32 "] is exceeded by FlowFile of [%zu]",
+ *maximum_packet_size_, buffer.size());
+ }
+
+ MQTTAsync_message message_to_publish = MQTTAsync_message_initializer;
+ message_to_publish.payload = const_cast<std::byte*>(buffer.data());
+ message_to_publish.payloadlen = buffer.size();
+ message_to_publish.qos = qos_.value();
+ message_to_publish.retained = retain_;
+
+ setMqtt5Properties(message_to_publish, content_type, flow_file);
+
+ MQTTAsync_responseOptions response_options =
MQTTAsync_responseOptions_initializer;
+ if (mqtt_version_ == MqttVersions::V_5_0) {
+ response_options.onSuccess5 = sendSuccess5;
+ response_options.onFailure5 = sendFailure5;
} else {
- logger_->log_debug("Sent flow with length %d to MQTT topic %s",
callback.read_size_, topic_);
- session->transfer(flowFile, Success);
- }
-}
-
-int64_t PublishMQTT::ReadCallback::operator()(const
std::shared_ptr<io::InputStream>& stream) {
- if (flow_size_ < max_seg_size_)
- max_seg_size_ = flow_size_;
- gsl_Expects(max_seg_size_ <
gsl::narrow<uint64_t>(std::numeric_limits<int>::max()));
- std::vector<std::byte> buffer(max_seg_size_);
- read_size_ = 0;
- status_ = 0;
- while (read_size_ < flow_size_) {
- // MQTTClient_message::payloadlen is int, so we can't handle 2GB+
- const auto readRet = stream->read(buffer);
- if (io::isError(readRet)) {
- status_ = -1;
- return gsl::narrow<int64_t>(read_size_);
+ response_options.onSuccess = sendSuccess;
+ response_options.onFailure = sendFailure;
+ }
+
+ // save context for callback
+ std::packaged_task<bool(bool, std::optional<int>,
std::optional<MQTTReasonCodes>)> send_finished_task(
+ [this] (const bool success, const std::optional<int> response_code,
const std::optional<MQTTReasonCodes> reason_code) {
+ return notify(success, response_code, reason_code);
+ });
+ response_options.context = &send_finished_task;
+
+ in_flight_message_counter_.increase();
+
+ const int error_code = MQTTAsync_sendMessage(client_, topic.c_str(),
&message_to_publish, &response_options);
+ if (error_code != MQTTASYNC_SUCCESS) {
+ logger_->log_error("MQTTAsync_sendMessage failed on topic '%s', MQTT
broker %s with error code [%d]", topic, uri_, error_code);
+ // early fail, sending attempt did not succeed, no need to wait for
callback
+ in_flight_message_counter_.decrease();
+ return false;
+ }
+
+ return send_finished_task.get_future().get();
+}
+
+void PublishMQTT::checkProperties() {
+ if ((mqtt_version_ == MqttVersions::V_3_1_0 || mqtt_version_ ==
MqttVersions::V_3_1_1 || mqtt_version_ == MqttVersions::V_3X_AUTO)) {
+ if (isPropertyExplicitlySet(MessageExpiryInterval)) {
+ logger_->log_warn("MQTT 3.x specification does not support Message
Expiry Intervals. Property is not used.");
+ }
+ if (isPropertyExplicitlySet(ContentType)) {
+ logger_->log_warn("MQTT 3.x specification does not support Content
Types. Property is not used.");
+ }
+ }
+}
+
+void PublishMQTT::checkBrokerLimitsImpl() {
+ if (retain_available_.has_value() && !*retain_available_ && retain_) {
Review Comment:
I would do
```suggestion
if (retain_available_ == false && retain_) {
```
here, too
##########
extensions/mqtt/processors/PublishMQTT.cpp:
##########
@@ -34,76 +34,229 @@ void PublishMQTT::initialize() {
setSupportedRelationships(relationships());
}
-void PublishMQTT::onSchedule(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+void PublishMQTT::readProperties(const std::shared_ptr<core::ProcessContext>&
context) {
+ if (!context->getProperty(Topic).has_value()) {
+ logger_->log_error("PublishMQTT: could not get Topic");
+ }
+
if (const auto retain_opt = context->getProperty<bool>(Retain)) {
retain_ = *retain_opt;
}
logger_->log_debug("PublishMQTT: Retain [%d]", retain_);
- AbstractMQTTProcessor::onSchedule(context, factory);
+ if (const auto message_expiry_interval =
context->getProperty<core::TimePeriodValue>(MessageExpiryInterval)) {
+ message_expiry_interval_ =
std::chrono::duration_cast<std::chrono::seconds>(message_expiry_interval->getMilliseconds());
+ logger_->log_debug("PublishMQTT: MessageExpiryInterval [%" PRId64 "] s",
int64_t{message_expiry_interval_->count()});
+ }
+
+ in_flight_message_counter_.setMqttVersion(mqtt_version_);
+ in_flight_message_counter_.setQoS(qos_);
}
-void PublishMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>&
/*context*/, const std::shared_ptr<core::ProcessSession> &session) {
- // reconnect if needed
- reconnect();
+void PublishMQTT::onTriggerImpl(const std::shared_ptr<core::ProcessContext>&
context, const std::shared_ptr<core::ProcessSession> &session) {
+ std::shared_ptr<core::FlowFile> flow_file = session->get();
- if (!MQTTAsync_isConnected(client_)) {
- logger_->log_error("Could not publish to MQTT broker because disconnected
to %s", uri_);
- yield();
+ if (!flow_file) {
return;
}
- std::shared_ptr<core::FlowFile> flowFile = session->get();
+ // broker's Receive Maximum can change after reconnect
+
in_flight_message_counter_.setMax(broker_receive_maximum_.value_or(MQTT_MAX_RECEIVE_MAXIMUM));
- if (!flowFile) {
- return;
+ const auto topic = getTopic(context, flow_file);
+ try {
+ const auto result = session->readBuffer(flow_file);
+ if (result.status < 0 || !sendMessage(result.buffer, topic,
getContentType(context, flow_file), flow_file)) {
+ logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on
broker %s", flow_file->getUUIDStr(), topic, uri_);
+ session->transfer(flow_file, Failure);
+ return;
+ }
+ logger_->log_debug("Sent flow file [%s] with length %" PRId64 " to MQTT
topic '%s' on broker %s", flow_file->getUUIDStr(), result.status, topic, uri_);
+ session->transfer(flow_file, Success);
+ } catch (const Exception& ex) {
+ logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on
broker %s, exception string: '%s'", flow_file->getUUIDStr(), topic, uri_,
ex.what());
+ session->transfer(flow_file, Failure);
+ }
+}
+
+bool PublishMQTT::sendMessage(const std::vector<std::byte>& buffer, const
std::string& topic, const std::string& content_type, const
std::shared_ptr<core::FlowFile>& flow_file) {
+ if (buffer.size() > 268'435'455) {
+ logger_->log_error("Sending message failed because MQTT limit maximum
packet size [268'435'455] is exceeded by FlowFile of [%zu]", buffer.size());
}
- PublishMQTT::ReadCallback callback(this, flowFile->getSize(), max_seg_size_,
topic_, client_, gsl::narrow<int>(qos_), retain_);
- session->read(flowFile, std::ref(callback));
- if (callback.status_ < 0) {
- logger_->log_error("Failed to send flow to MQTT topic %s", topic_);
- session->transfer(flowFile, Failure);
+ if (maximum_packet_size_.has_value() && buffer.size() >
*(maximum_packet_size_)) {
+ logger_->log_error("Sending message failed because broker-requested
maximum packet size [%" PRIu32 "] is exceeded by FlowFile of [%zu]",
+ *maximum_packet_size_, buffer.size());
+ }
+
+ MQTTAsync_message message_to_publish = MQTTAsync_message_initializer;
+ message_to_publish.payload = const_cast<std::byte*>(buffer.data());
+ message_to_publish.payloadlen = buffer.size();
+ message_to_publish.qos = qos_.value();
+ message_to_publish.retained = retain_;
+
+ setMqtt5Properties(message_to_publish, content_type, flow_file);
+
+ MQTTAsync_responseOptions response_options =
MQTTAsync_responseOptions_initializer;
+ if (mqtt_version_ == MqttVersions::V_5_0) {
+ response_options.onSuccess5 = sendSuccess5;
+ response_options.onFailure5 = sendFailure5;
} else {
- logger_->log_debug("Sent flow with length %d to MQTT topic %s",
callback.read_size_, topic_);
- session->transfer(flowFile, Success);
- }
-}
-
-int64_t PublishMQTT::ReadCallback::operator()(const
std::shared_ptr<io::InputStream>& stream) {
- if (flow_size_ < max_seg_size_)
- max_seg_size_ = flow_size_;
- gsl_Expects(max_seg_size_ <
gsl::narrow<uint64_t>(std::numeric_limits<int>::max()));
- std::vector<std::byte> buffer(max_seg_size_);
- read_size_ = 0;
- status_ = 0;
- while (read_size_ < flow_size_) {
- // MQTTClient_message::payloadlen is int, so we can't handle 2GB+
- const auto readRet = stream->read(buffer);
- if (io::isError(readRet)) {
- status_ = -1;
- return gsl::narrow<int64_t>(read_size_);
+ response_options.onSuccess = sendSuccess;
+ response_options.onFailure = sendFailure;
+ }
+
+ // save context for callback
+ std::packaged_task<bool(bool, std::optional<int>,
std::optional<MQTTReasonCodes>)> send_finished_task(
+ [this] (const bool success, const std::optional<int> response_code,
const std::optional<MQTTReasonCodes> reason_code) {
+ return notify(success, response_code, reason_code);
+ });
+ response_options.context = &send_finished_task;
+
+ in_flight_message_counter_.increase();
+
+ const int error_code = MQTTAsync_sendMessage(client_, topic.c_str(),
&message_to_publish, &response_options);
+ if (error_code != MQTTASYNC_SUCCESS) {
+ logger_->log_error("MQTTAsync_sendMessage failed on topic '%s', MQTT
broker %s with error code [%d]", topic, uri_, error_code);
+ // early fail, sending attempt did not succeed, no need to wait for
callback
+ in_flight_message_counter_.decrease();
+ return false;
+ }
+
+ return send_finished_task.get_future().get();
+}
+
+void PublishMQTT::checkProperties() {
+ if ((mqtt_version_ == MqttVersions::V_3_1_0 || mqtt_version_ ==
MqttVersions::V_3_1_1 || mqtt_version_ == MqttVersions::V_3X_AUTO)) {
+ if (isPropertyExplicitlySet(MessageExpiryInterval)) {
+ logger_->log_warn("MQTT 3.x specification does not support Message
Expiry Intervals. Property is not used.");
+ }
+ if (isPropertyExplicitlySet(ContentType)) {
+ logger_->log_warn("MQTT 3.x specification does not support Content
Types. Property is not used.");
+ }
+ }
+}
+
+void PublishMQTT::checkBrokerLimitsImpl() {
+ if (retain_available_.has_value() && !*retain_available_ && retain_) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Retain was set but broker
does not support it");
+ }
+}
+
+void PublishMQTT::sendSuccess(void* context, MQTTAsync_successData*
/*response*/) {
+ auto send_finished_task = reinterpret_cast<std::packaged_task<void(bool,
std::optional<int>, std::optional<MQTTReasonCodes>)>*>(context);
+ (*send_finished_task)(true, std::nullopt, std::nullopt);
+}
+
+void PublishMQTT::sendSuccess5(void* context, MQTTAsync_successData5*
response) {
+ auto send_finished_task = reinterpret_cast<std::packaged_task<void(bool,
std::optional<int>, std::optional<MQTTReasonCodes>)>*>(context);
+ (*send_finished_task)(true, std::nullopt, response->reasonCode);
+}
+
+void PublishMQTT::sendFailure(void* context, MQTTAsync_failureData* response) {
+ auto send_finished_task = reinterpret_cast<std::packaged_task<void(bool,
std::optional<int>, std::optional<MQTTReasonCodes>)>*>(context);
+ (*send_finished_task)(false, response->code, std::nullopt);
+}
+
+void PublishMQTT::sendFailure5(void* context, MQTTAsync_failureData5*
response) {
+ auto send_finished_task = reinterpret_cast<std::packaged_task<void(bool,
std::optional<int>, std::optional<MQTTReasonCodes>)>*>(context);
+ (*send_finished_task)(false, response->code, response->reasonCode);
+}
+
+bool PublishMQTT::notify(const bool success, const std::optional<int>
response_code, const std::optional<MQTTReasonCodes> reason_code) {
+ in_flight_message_counter_.decrease();
+
+ if (success) {
+ logger_->log_debug("Successfully sent message to MQTT broker %s", uri_);
+ if (reason_code.has_value()) {
+ logger_->log_error("Additional reason code for sending success: %d: %s",
*reason_code, MQTTReasonCode_toString(*reason_code));
Review Comment:
in `ConsumeMQTT`, we only use `MQTTReasonCode_toString()` in the v5.0 case;
is it OK to use it here in the v3.x case, too?
##########
extensions/mqtt/processors/PublishMQTT.cpp:
##########
@@ -34,76 +34,229 @@ void PublishMQTT::initialize() {
setSupportedRelationships(relationships());
}
-void PublishMQTT::onSchedule(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+void PublishMQTT::readProperties(const std::shared_ptr<core::ProcessContext>&
context) {
+ if (!context->getProperty(Topic).has_value()) {
+ logger_->log_error("PublishMQTT: could not get Topic");
+ }
+
if (const auto retain_opt = context->getProperty<bool>(Retain)) {
retain_ = *retain_opt;
}
logger_->log_debug("PublishMQTT: Retain [%d]", retain_);
- AbstractMQTTProcessor::onSchedule(context, factory);
+ if (const auto message_expiry_interval =
context->getProperty<core::TimePeriodValue>(MessageExpiryInterval)) {
+ message_expiry_interval_ =
std::chrono::duration_cast<std::chrono::seconds>(message_expiry_interval->getMilliseconds());
+ logger_->log_debug("PublishMQTT: MessageExpiryInterval [%" PRId64 "] s",
int64_t{message_expiry_interval_->count()});
+ }
+
+ in_flight_message_counter_.setMqttVersion(mqtt_version_);
+ in_flight_message_counter_.setQoS(qos_);
}
-void PublishMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>&
/*context*/, const std::shared_ptr<core::ProcessSession> &session) {
- // reconnect if needed
- reconnect();
+void PublishMQTT::onTriggerImpl(const std::shared_ptr<core::ProcessContext>&
context, const std::shared_ptr<core::ProcessSession> &session) {
+ std::shared_ptr<core::FlowFile> flow_file = session->get();
- if (!MQTTAsync_isConnected(client_)) {
- logger_->log_error("Could not publish to MQTT broker because disconnected
to %s", uri_);
- yield();
+ if (!flow_file) {
return;
}
- std::shared_ptr<core::FlowFile> flowFile = session->get();
+ // broker's Receive Maximum can change after reconnect
+
in_flight_message_counter_.setMax(broker_receive_maximum_.value_or(MQTT_MAX_RECEIVE_MAXIMUM));
- if (!flowFile) {
- return;
+ const auto topic = getTopic(context, flow_file);
+ try {
+ const auto result = session->readBuffer(flow_file);
+ if (result.status < 0 || !sendMessage(result.buffer, topic,
getContentType(context, flow_file), flow_file)) {
+ logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on
broker %s", flow_file->getUUIDStr(), topic, uri_);
+ session->transfer(flow_file, Failure);
+ return;
+ }
+ logger_->log_debug("Sent flow file [%s] with length %" PRId64 " to MQTT
topic '%s' on broker %s", flow_file->getUUIDStr(), result.status, topic, uri_);
+ session->transfer(flow_file, Success);
+ } catch (const Exception& ex) {
+ logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on
broker %s, exception string: '%s'", flow_file->getUUIDStr(), topic, uri_,
ex.what());
+ session->transfer(flow_file, Failure);
+ }
+}
+
+bool PublishMQTT::sendMessage(const std::vector<std::byte>& buffer, const
std::string& topic, const std::string& content_type, const
std::shared_ptr<core::FlowFile>& flow_file) {
+ if (buffer.size() > 268'435'455) {
Review Comment:
Can you give this constant a name which explains where it comes from,
please? Also, `0x0FFF'FFFF` would be better than `268'435'455`.
##########
extensions/mqtt/processors/PublishMQTT.cpp:
##########
@@ -34,76 +34,229 @@ void PublishMQTT::initialize() {
setSupportedRelationships(relationships());
}
-void PublishMQTT::onSchedule(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+void PublishMQTT::readProperties(const std::shared_ptr<core::ProcessContext>&
context) {
+ if (!context->getProperty(Topic).has_value()) {
+ logger_->log_error("PublishMQTT: could not get Topic");
+ }
+
if (const auto retain_opt = context->getProperty<bool>(Retain)) {
retain_ = *retain_opt;
}
logger_->log_debug("PublishMQTT: Retain [%d]", retain_);
- AbstractMQTTProcessor::onSchedule(context, factory);
+ if (const auto message_expiry_interval =
context->getProperty<core::TimePeriodValue>(MessageExpiryInterval)) {
+ message_expiry_interval_ =
std::chrono::duration_cast<std::chrono::seconds>(message_expiry_interval->getMilliseconds());
+ logger_->log_debug("PublishMQTT: MessageExpiryInterval [%" PRId64 "] s",
int64_t{message_expiry_interval_->count()});
+ }
+
+ in_flight_message_counter_.setMqttVersion(mqtt_version_);
+ in_flight_message_counter_.setQoS(qos_);
}
-void PublishMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>&
/*context*/, const std::shared_ptr<core::ProcessSession> &session) {
- // reconnect if needed
- reconnect();
+void PublishMQTT::onTriggerImpl(const std::shared_ptr<core::ProcessContext>&
context, const std::shared_ptr<core::ProcessSession> &session) {
+ std::shared_ptr<core::FlowFile> flow_file = session->get();
- if (!MQTTAsync_isConnected(client_)) {
- logger_->log_error("Could not publish to MQTT broker because disconnected
to %s", uri_);
- yield();
+ if (!flow_file) {
return;
Review Comment:
We usually `yield()` in cases like this; why not here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]