szaszm commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1073689675
##########
extensions/mqtt/processors/PublishMQTT.h:
##########
@@ -62,72 +68,116 @@ class PublishMQTT : public
processors::AbstractMQTTProcessor {
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
- class ReadCallback {
+ void readProperties(const std::shared_ptr<core::ProcessContext>& context)
override;
+ void onTriggerImpl(const std::shared_ptr<core::ProcessContext>& context,
const std::shared_ptr<core::ProcessSession>& session) override;
+ void initialize() override;
+
+ private:
+ /**
+ * Counts unacknowledged QoS 1 and QoS 2 messages to respect broker's
Receive Maximum
+ */
+ class InFlightMessageCounter {
public:
- ReadCallback(PublishMQTT* processor, uint64_t flow_size, uint64_t
max_seg_size, std::string topic, MQTTAsync client, int qos, bool retain)
- : processor_(processor),
- flow_size_(flow_size),
- max_seg_size_(max_seg_size),
- topic_(std::move(topic)),
- client_(client),
- qos_(qos),
- retain_(retain) {
+ void setMqttVersion(const MqttVersions mqtt_version) {
+ mqtt_version_ = mqtt_version;
+ }
+
+ void setQoS(const MqttQoS qos) {
+ qos_ = qos;
+ }
+
+ void setMax(const uint16_t new_limit) {
+ limit_ = new_limit;
Review Comment:
Isn't it necessary to lock the mutex here? Or if this limit can't change,
maybe it should be initialized in the constructor instead, and not changed
later.
##########
extensions/mqtt/processors/PublishMQTT.h:
##########
@@ -62,72 +68,116 @@ class PublishMQTT : public
processors::AbstractMQTTProcessor {
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
- class ReadCallback {
+ void readProperties(const std::shared_ptr<core::ProcessContext>& context)
override;
+ void onTriggerImpl(const std::shared_ptr<core::ProcessContext>& context,
const std::shared_ptr<core::ProcessSession>& session) override;
+ void initialize() override;
+
+ private:
+ /**
+ * Counts unacknowledged QoS 1 and QoS 2 messages to respect broker's
Receive Maximum
+ */
+ class InFlightMessageCounter {
public:
- ReadCallback(PublishMQTT* processor, uint64_t flow_size, uint64_t
max_seg_size, std::string topic, MQTTAsync client, int qos, bool retain)
- : processor_(processor),
- flow_size_(flow_size),
- max_seg_size_(max_seg_size),
- topic_(std::move(topic)),
- client_(client),
- qos_(qos),
- retain_(retain) {
+ void setMqttVersion(const MqttVersions mqtt_version) {
+ mqtt_version_ = mqtt_version;
+ }
+
+ void setQoS(const MqttQoS qos) {
+ qos_ = qos;
+ }
+
+ void setMax(const uint16_t new_limit) {
+ limit_ = new_limit;
}
- int64_t operator()(const std::shared_ptr<io::InputStream>& stream);
+ // increase on sending, wait if limit is reached
+ void increase();
Review Comment:
Is there a condition where this can lock indefinitely? Like some messages
getting stuck in the "in flight" state due to some network error, and new
triggers blocking a thread forever. It might make sense to add a timeout
parameter and a success/timeout status result.
##########
extensions/mqtt/processors/PublishMQTT.h:
##########
@@ -62,72 +68,116 @@ class PublishMQTT : public
processors::AbstractMQTTProcessor {
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
- class ReadCallback {
+ void readProperties(const std::shared_ptr<core::ProcessContext>& context)
override;
+ void onTriggerImpl(const std::shared_ptr<core::ProcessContext>& context,
const std::shared_ptr<core::ProcessSession>& session) override;
+ void initialize() override;
+
+ private:
+ /**
+ * Counts unacknowledged QoS 1 and QoS 2 messages to respect broker's
Receive Maximum
+ */
+ class InFlightMessageCounter {
Review Comment:
I think this class is a semaphore reimplementation. Can we change from this
to `std::counting_semaphore`?
##########
extensions/mqtt/processors/PublishMQTT.cpp:
##########
@@ -29,81 +29,240 @@
namespace org::apache::nifi::minifi::processors {
+using SendFinishedTask = std::packaged_task<bool(bool, std::optional<int>,
std::optional<MQTTReasonCodes>)>;
+
void PublishMQTT::initialize() {
setSupportedProperties(properties());
setSupportedRelationships(relationships());
}
-void PublishMQTT::onSchedule(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+void PublishMQTT::readProperties(const std::shared_ptr<core::ProcessContext>&
context) {
+ if (!context->getProperty(Topic).has_value()) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "PublishMQTT: Topic is
required");
+ }
+
if (const auto retain_opt = context->getProperty<bool>(Retain)) {
retain_ = *retain_opt;
}
logger_->log_debug("PublishMQTT: Retain [%d]", retain_);
- AbstractMQTTProcessor::onSchedule(context, factory);
+ if (const auto message_expiry_interval =
context->getProperty<core::TimePeriodValue>(MessageExpiryInterval)) {
+ message_expiry_interval_ =
std::chrono::duration_cast<std::chrono::seconds>(message_expiry_interval->getMilliseconds());
+ logger_->log_debug("PublishMQTT: MessageExpiryInterval [%" PRId64 "] s",
int64_t{message_expiry_interval_->count()});
+ }
+
+ in_flight_message_counter_.setMqttVersion(mqtt_version_);
+ in_flight_message_counter_.setQoS(qos_);
}
-void PublishMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>&
/*context*/, const std::shared_ptr<core::ProcessSession> &session) {
- // reconnect if needed
- reconnect();
+void PublishMQTT::onTriggerImpl(const std::shared_ptr<core::ProcessContext>&
context, const std::shared_ptr<core::ProcessSession>& session) {
+ std::shared_ptr<core::FlowFile> flow_file = session->get();
- if (!MQTTAsync_isConnected(client_)) {
- logger_->log_error("Could not publish to MQTT broker because disconnected
to %s", uri_);
+ if (!flow_file) {
yield();
return;
}
- std::shared_ptr<core::FlowFile> flowFile = session->get();
+ // broker's Receive Maximum can change after reconnect
+
in_flight_message_counter_.setMax(broker_receive_maximum_.value_or(MQTT_MAX_RECEIVE_MAXIMUM));
- if (!flowFile) {
- return;
+ const auto topic = getTopic(context, flow_file);
+ try {
+ const auto result = session->readBuffer(flow_file);
+ if (result.status < 0 || !sendMessage(result.buffer, topic,
getContentType(context, flow_file), flow_file)) {
+ logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on
broker %s", flow_file->getUUIDStr(), topic, uri_);
+ session->transfer(flow_file, Failure);
+ return;
+ }
+ logger_->log_debug("Sent flow file [%s] with length %" PRId64 " to MQTT
topic '%s' on broker %s", flow_file->getUUIDStr(), result.status, topic, uri_);
+ session->transfer(flow_file, Success);
+ } catch (const Exception& ex) {
+ logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on
broker %s, exception string: '%s'", flow_file->getUUIDStr(), topic, uri_,
ex.what());
+ session->transfer(flow_file, Failure);
+ }
+}
+
+bool PublishMQTT::sendMessage(const std::vector<std::byte>& buffer, const
std::string& topic, const std::string& content_type, const
std::shared_ptr<core::FlowFile>& flow_file) {
+ static const unsigned max_packet_size = 268'435'455;
Review Comment:
```suggestion
static const unsigned max_packet_size = 256_MiB - 1;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]