Github user phrocker commented on a diff in the pull request:
https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159794068
--- Diff: extensions/mqtt/ConsumeMQTT.h ---
@@ -0,0 +1,125 @@
+/**
+ * @file ConsumeMQTT.h
+ * ConsumeMQTT class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __CONSUME_MQTT_H__
+#define __CONSUME_MQTT_H__
+
+#include <climits>
+#include <deque>
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/Property.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "MQTTClient.h"
+#include "AbstractMQTTProcessor.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+#define MQTT_TOPIC_ATTRIBUTE "mqtt.topic"
+#define MQTT_BROKER_ATTRIBUTE "mqtt.broker"
+
+// ConsumeMQTT Class
+class ConsumeMQTT: public processors::AbstractMQTTProcessor {
+public:
+ // Constructor
+ /*!
+ * Create a new processor
+ */
+ explicit ConsumeMQTT(std::string name, uuid_t uuid = NULL)
+ : processors::AbstractMQTTProcessor(name, uuid),
logger_(logging::LoggerFactory<ConsumeMQTT>::getLogger()) {
+ isSubscriber_ = true;
+ maxQueueSize_ = 100;
+ }
+ // Destructor
+ virtual ~ConsumeMQTT() {
+ std::lock_guard < std::mutex > lock(mutex_);
+ while (!queue_.empty()) {
+ MQTTClient_message *message = queue_.front();
+ MQTTClient_freeMessage(&message);
+ queue_.pop_front();
+ }
+ }
+ // Processor Name
+ static constexpr char const* ProcessorName = "ConsumeMQTT";
+ // Supported Properties
+ static core::Property MaxQueueSize;
+ // Nest Callback Class for write stream
+ class WriteCallback: public OutputStreamCallback {
+ public:
+ WriteCallback(MQTTClient_message *message) :
+ message_(message) {
+ status_ = 0;
+ }
+ MQTTClient_message *message_;
+ int64_t process(std::shared_ptr<io::BaseStream> stream) {
+ int64_t len =
stream->write(reinterpret_cast<uint8_t*>(message_->payload),
message_->payloadlen);
+ if (len < 0)
+ status_ = -1;
+ return len;
+ }
+ int status_;
+ };
+
+public:
+ /**
+ * Function that's executed when the processor is scheduled.
+ * @param context process context.
+ * @param sessionFactory process session factory that is used when
creating
+ * ProcessSession objects.
+ */
+ void onSchedule(core::ProcessContext *context,
core::ProcessSessionFactory *sessionFactory);
+ // OnTrigger method, implemented by NiFi ConsumeMQTT
+ virtual void onTrigger(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::ProcessSession> &session);
+ // Initialize, over write by NiFi ConsumeMQTT
+ virtual void initialize(void);
+ virtual bool enqueueReceiveMQTTMsg(MQTTClient_message *message);
+
+protected:
+ void getReceivedMQTTMsg(std::deque<MQTTClient_message *> &msg_queue) {
+ std::lock_guard < std::mutex > lock(mutex_);
--- End diff --
Could you use a lock free queue here? might not save much but may increase
throughput and we have one in our code base already.
---