Github user phrocker commented on a diff in the pull request:
https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159794354
--- Diff: extensions/mqtt/PublishMQTT.h ---
@@ -0,0 +1,142 @@
+/**
+ * @file PublishMQTT.h
+ * PublishMQTT class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PUBLISH_MQTT_H__
+#define __PUBLISH_MQTT_H__
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/Property.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "MQTTClient.h"
+#include "AbstractMQTTProcessor.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// PublishMQTT Class
+class PublishMQTT: public processors::AbstractMQTTProcessor {
+public:
+ // Constructor
+ /*!
+ * Create a new processor
+ */
+ explicit PublishMQTT(std::string name, uuid_t uuid = NULL)
+ : processors::AbstractMQTTProcessor(name, uuid),
logger_(logging::LoggerFactory<PublishMQTT>::getLogger()) {
+ retain_ = false;
+ max_seg_size_ = ULLONG_MAX;
+ }
+ // Destructor
+ virtual ~PublishMQTT() {
+ }
+ // Processor Name
+ static constexpr char const* ProcessorName = "PublishMQTT";
+ // Supported Properties
+ static core::Property Retain;
+ static core::Property MaxFlowSegSize;
+
+ // Nest Callback Class for read stream
+ class ReadCallback: public InputStreamCallback {
+ public:
+ ReadCallback(uint64_t flow_size, uint64_t max_seg_size, const
std::string &key, MQTTClient client,
+ int qos, bool retain, MQTTClient_deliveryToken &token) :
+ flow_size_(flow_size), max_seg_size_(max_seg_size), key_(key),
client_(client),
+ qos_(qos), retain_(retain), token_(token) {
+ status_ = 0;
+ read_size_ = 0;
+ }
+ ~ReadCallback() {
+ }
+ int64_t process(std::shared_ptr<io::BaseStream> stream) {
+ if (flow_size_ < max_seg_size_)
+ max_seg_size_ = flow_size_;
+ std::vector<unsigned char> buffer;
+ buffer.reserve(max_seg_size_);
+ read_size_ = 0;
+ status_ = 0;
+ while (read_size_ < flow_size_) {
+ int readRet = stream->read(&buffer[0], max_seg_size_);
+ if (readRet < 0) {
+ status_ = -1;
+ return read_size_;
+ }
+ if (readRet > 0) {
+ MQTTClient_message pubmsg = MQTTClient_message_initializer;
+ pubmsg.payload = &buffer[0];
+ pubmsg.payloadlen = readRet;
+ pubmsg.qos = qos_;
+ pubmsg.retained = retain_;
+ if (MQTTClient_publishMessage(client_, key_.c_str(), &pubmsg,
&token_) != MQTTCLIENT_SUCCESS) {
--- End diff --
does the publish copy the buffer? If not, does it finish entirely or will
it create a callback ? I ask because you're passing in the buffer as the
payload, but if there is a callback, we could possibly have memory that's freed
when this function exits with the MQTTClient still in progress.
---