fgerlits commented on code in PR #1990:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1990#discussion_r2245787019
##########
docker/test/integration/cluster/checkers/sparkplug_b_pb2.py:
##########
Review Comment:
Could we generate this when building (or before running) the tests instead
of having the generated code in the repo?
##########
docker/test/integration/features/steps/steps.py:
##########
@@ -435,6 +436,18 @@ def step_impl(context):
context.test.start('mqtt-broker')
+@given("a SparkplugBReader controller service is set up")
+def step_impl(context):
+ json_record_set_reader = SparkplugBReader("SparkplugBReader")
+ container = context.test.acquire_container(context=context,
name="minifi-cpp-flow")
+ container.add_controller(json_record_set_reader)
Review Comment:
nitpicking, but this is not JSON:
```suggestion
record_set_reader = SparkplugBReader("SparkplugBReader")
container = context.test.acquire_container(context=context,
name="minifi-cpp-flow")
container.add_controller(record_set_reader)
```
##########
extensions/mqtt/processors/ConsumeMQTT.cpp:
##########
@@ -58,8 +84,55 @@ void ConsumeMQTT::readProperties(core::ProcessContext&
context) {
receive_maximum_ = gsl::narrow<uint16_t>(utils::parseU64Property(context,
ReceiveMaximum));
}
-void ConsumeMQTT::onTriggerImpl(core::ProcessContext&, core::ProcessSession&
session) {
- std::queue<SmartMessage> msg_queue = getReceivedMqttMessages();
+void ConsumeMQTT::addAttributesAsRecordFields(core::RecordSet& new_records,
const std::queue<SmartMessage>& msg_queue) const {
+ if (!add_attributes_as_fields_) {
+ return;
+ }
+
+ for (auto& record: new_records) {
+ record.emplace("_topic", core::RecordField(msg_queue.front().topic));
+ auto topic_segments = utils::string::split(msg_queue.front().topic, "/");
+ for (size_t i = 0; i < topic_segments.size(); ++i) {
+ record.emplace("_topic.segment." + std::to_string(i),
core::RecordField(topic_segments[i]));
+ }
+ record.emplace("_qos", core::RecordField(msg_queue.front().contents->qos));
+ record.emplace("_isDuplicate",
core::RecordField(msg_queue.front().contents->dup > 0));
+ record.emplace("_isRetained",
core::RecordField(msg_queue.front().contents->retained > 0));
+ }
+}
+
+void ConsumeMQTT::transferMessagesAsRecords(core::ProcessSession& session) {
+ gsl_Expects(record_set_reader_ && record_set_writer_);
+ auto msg_queue = getReceivedMqttMessages();
+ core::RecordSet record_set;
+ while (!msg_queue.empty()) {
+ io::BufferStream buffer_stream;
+ buffer_stream.write(reinterpret_cast<const
uint8_t*>(msg_queue.front().contents->payload),
gsl::narrow<size_t>(msg_queue.front().contents->payloadlen));
+ auto new_records_result = record_set_reader_->read(buffer_stream);
+ if (!new_records_result) {
+ logger_->log_error("Failed to read records from MQTT message: {}",
new_records_result.error());
+ msg_queue.pop();
+ continue;
+ }
+ auto& new_records = new_records_result.value();
+ addAttributesAsRecordFields(new_records, msg_queue);
+ record_set.reserve(record_set.size() + new_records.size());
+ record_set.insert(record_set.end(),
std::make_move_iterator(new_records.begin()),
std::make_move_iterator(new_records.end()));
+ msg_queue.pop();
+ }
+ if (record_set.empty()) {
+ logger_->log_debug("No records to write, skipping FlowFile creation");
+ return;
+ }
+ std::shared_ptr<core::FlowFile> flow_file = session.create();
+ record_set_writer_->write(record_set, flow_file, session);
+ session.putAttribute(*flow_file, RecordCountOutputAttribute.name,
std::to_string(record_set.size()));
+ session.putAttribute(*flow_file, BrokerOutputAttribute.name, uri_);
+ session.transfer(flow_file, Success);
Review Comment:
In `tranferMessagesAsFlowFiles`, we add a bunch of flow file attributes
using `putUserPropertiesAsAttributes` and `fillAttributeFromContentType`. Are
those not needed / not useful here?
##########
extensions/mqtt/processors/ConsumeMQTT.h:
##########
@@ -92,15 +107,26 @@ class ConsumeMQTT : public
processors::AbstractMQTTProcessor {
QueueBufferMaxMessage,
AttributeFromContentType,
TopicAliasMaximum,
- ReceiveMaximum
+ ReceiveMaximum,
+ RecordReader,
+ RecordWriter,
+ AddAttributesAsFields
}), AbstractMQTTProcessor::AdvancedProperties);
EXTENSIONAPI static constexpr auto Success =
core::RelationshipDefinition{"success", "FlowFiles that are sent successfully
to the destination are transferred to this relationship"};
EXTENSIONAPI static constexpr auto Relationships = std::array{Success};
EXTENSIONAPI static constexpr auto BrokerOutputAttribute =
core::OutputAttributeDefinition<0>{"mqtt.broker", {}, "URI of the sending
broker"};
EXTENSIONAPI static constexpr auto TopicOutputAttribute =
core::OutputAttributeDefinition<0>{"mqtt.topic", {}, "Topic of the message"};
- EXTENSIONAPI static constexpr auto OutputAttributes =
std::array<core::OutputAttributeReference, 2>{BrokerOutputAttribute,
TopicOutputAttribute};
+ EXTENSIONAPI static constexpr auto TopicSegmentOutputAttribute =
core::OutputAttributeDefinition<0>{"mqtt.topic.segment.n", {}, "The nth topic
segment of the message"};
+ EXTENSIONAPI static constexpr auto QosOutputAttribute =
core::OutputAttributeDefinition<0>{"mqtt.qos", {}, "The quality of service for
this message."};
+ EXTENSIONAPI static constexpr auto IsDuplicateOutputAttribute =
core::OutputAttributeDefinition<0>{"mqtt.isDuplicate", {},
+ "Whether or not this message might be a duplicate of one which has
already been received."};
+ EXTENSIONAPI static constexpr auto IsRetainedOutputAttribute =
core::OutputAttributeDefinition<0>{"mqtt.isRetained", {},
+ "Whether or not this message was from a current publisher, or was
\"retained\" by the server as the last message published on the topic."};
+ EXTENSIONAPI static constexpr auto RecordCountOutputAttribute =
core::OutputAttributeDefinition<0>{"record.count", {}, "The number of records
received"};
+ EXTENSIONAPI static constexpr auto OutputAttributes =
std::array<core::OutputAttributeReference, 7>{BrokerOutputAttribute,
TopicOutputAttribute, TopicSegmentOutputAttribute,
+ QosOutputAttribute, IsDuplicateOutputAttribute,
IsRetainedOutputAttribute, RecordCountOutputAttribute};
Review Comment:
```suggestion
EXTENSIONAPI static constexpr auto OutputAttributes =
std::to_array<core::OutputAttributeReference>({BrokerOutputAttribute,
TopicOutputAttribute, TopicSegmentOutputAttribute,
QosOutputAttribute, IsDuplicateOutputAttribute,
IsRetainedOutputAttribute, RecordCountOutputAttribute});
```
would be marginally nicer
##########
extensions/mqtt/processors/ConsumeMQTT.cpp:
##########
@@ -26,17 +25,44 @@
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/Resource.h"
+#include "io/BufferStream.h"
+#include "utils/ProcessorConfigUtils.h"
#include "utils/StringUtils.h"
#include "utils/ValueParser.h"
-#include "utils/ProcessorConfigUtils.h"
namespace org::apache::nifi::minifi::processors {
+namespace {
+template<typename RecordSetIO>
+std::shared_ptr<RecordSetIO> getRecordSetIO(core::ProcessContext& context,
const core::PropertyReference& property,
+ const utils::Identifier& processor_uuid) {
+ std::string service_name = context.getProperty(property).value_or("");
+ if (!IsNullOrEmpty(service_name)) {
+ auto record_set_io =
std::dynamic_pointer_cast<RecordSetIO>(context.getControllerService(service_name,
processor_uuid));
+ if (!record_set_io) {
+ throw Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION,
fmt::format("'{}' property is set to invalid controller service '{}'",
property.name, service_name));
+ }
+ return record_set_io;
+ }
+ return nullptr;
+}
Review Comment:
we could use `utils::parseOptionalControllerService()` instead of this
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]