Github user minifirocks commented on a diff in the pull request:
https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159807078
--- Diff: extensions/mqtt/PublishMQTT.h ---
@@ -0,0 +1,142 @@
+/**
+ * @file PublishMQTT.h
+ * PublishMQTT class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PUBLISH_MQTT_H__
+#define __PUBLISH_MQTT_H__
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/Property.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "MQTTClient.h"
+#include "AbstractMQTTProcessor.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// PublishMQTT Class
+class PublishMQTT: public processors::AbstractMQTTProcessor {
+public:
+ // Constructor
+ /*!
+ * Create a new processor
+ */
+ explicit PublishMQTT(std::string name, uuid_t uuid = NULL)
+ : processors::AbstractMQTTProcessor(name, uuid),
logger_(logging::LoggerFactory<PublishMQTT>::getLogger()) {
+ retain_ = false;
+ max_seg_size_ = ULLONG_MAX;
+ }
+ // Destructor
+ virtual ~PublishMQTT() {
+ }
+ // Processor Name
+ static constexpr char const* ProcessorName = "PublishMQTT";
+ // Supported Properties
+ static core::Property Retain;
+ static core::Property MaxFlowSegSize;
+
+ // Nest Callback Class for read stream
+ class ReadCallback: public InputStreamCallback {
+ public:
+ ReadCallback(uint64_t flow_size, uint64_t max_seg_size, const
std::string &key, MQTTClient client,
+ int qos, bool retain, MQTTClient_deliveryToken &token) :
+ flow_size_(flow_size), max_seg_size_(max_seg_size), key_(key),
client_(client),
+ qos_(qos), retain_(retain), token_(token) {
+ status_ = 0;
+ read_size_ = 0;
+ }
+ ~ReadCallback() {
+ }
+ int64_t process(std::shared_ptr<io::BaseStream> stream) {
+ if (flow_size_ < max_seg_size_)
+ max_seg_size_ = flow_size_;
+ std::vector<unsigned char> buffer;
+ buffer.reserve(max_seg_size_);
+ read_size_ = 0;
+ status_ = 0;
+ while (read_size_ < flow_size_) {
+ int readRet = stream->read(&buffer[0], max_seg_size_);
+ if (readRet < 0) {
+ status_ = -1;
+ return read_size_;
+ }
+ if (readRet > 0) {
+ MQTTClient_message pubmsg = MQTTClient_message_initializer;
+ pubmsg.payload = &buffer[0];
+ pubmsg.payloadlen = readRet;
+ pubmsg.qos = qos_;
+ pubmsg.retained = retain_;
+ if (MQTTClient_publishMessage(client_, key_.c_str(), &pubmsg,
&token_) != MQTTCLIENT_SUCCESS) {
--- End diff --
it add the MQTT header and call socket write.
the deliverable callback is for QOS.
/**
* This is a callback function. The client application
* must provide an implementation of this function to enable asynchronous
* notification of delivery of messages. The function is registered with the
* client library by passing it as an argument to MQTTClient_setCallbacks().
* It is called by the client library after the client application has
* published a message to the server. It indicates that the necessary
* handshaking and acknowledgements for the requested quality of service
(see
* MQTTClient_message.qos) have been completed. This function is executed
on a
* separate thread to the one on which the client application is running.
* <b>Note:</b>MQTTClient_deliveryComplete() is not called when messages are
* published at QoS0.
* @param context A pointer to the <i>context</i> value originally passed to
* MQTTClient_setCallbacks(), which contains any application-specific
context.
* @param dt The ::MQTTClient_deliveryToken associated with
* the published message. Applications can check that all messages have been
* correctly published by matching the delivery tokens returned from calls
to
* MQTTClient_publish() and MQTTClient_publishMessage() with the tokens
passed
* to this callback.
*/
typedef void MQTTClient_deliveryComplete(void* context,
MQTTClient_deliveryToken dt);
---