Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 30df9a5ef -> 8dd7e91f9
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/processors/ConsumeMQTT.cpp ---------------------------------------------------------------------- diff --git a/extensions/mqtt/processors/ConsumeMQTT.cpp b/extensions/mqtt/processors/ConsumeMQTT.cpp new file mode 100644 index 0000000..472d35f --- /dev/null +++ b/extensions/mqtt/processors/ConsumeMQTT.cpp @@ -0,0 +1,119 @@ +/** + * @file ConsumeMQTT.cpp + * ConsumeMQTT class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "ConsumeMQTT.h" +#include <stdio.h> +#include <algorithm> +#include <memory> +#include <string> +#include <map> +#include <set> +#include "utils/TimeUtil.h" +#include "utils/StringUtils.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property ConsumeMQTT::MaxFlowSegSize("Max Flow Segment Size", "Maximum flow content payload segment size for the MQTT record", ""); +core::Property ConsumeMQTT::QueueBufferMaxMessage("Queue Max Message", "Maximum number of messages allowed on the received MQTT queue", ""); + +void ConsumeMQTT::initialize() { + // Set the supported properties + std::set<core::Property> properties; + properties.insert(BrokerURL); + properties.insert(CleanSession); + properties.insert(ClientID); + properties.insert(UserName); + properties.insert(PassWord); + properties.insert(KeepLiveInterval); + properties.insert(ConnectionTimeOut); + properties.insert(QOS); + properties.insert(Topic); + properties.insert(MaxFlowSegSize); + properties.insert(QueueBufferMaxMessage); + setSupportedProperties(properties); + // Set the supported relationships + std::set<core::Relationship> relationships; + relationships.insert(Success); + setSupportedRelationships(relationships); +} + +bool ConsumeMQTT::enqueueReceiveMQTTMsg(MQTTClient_message *message) { + if (queue_.size_approx() >= maxQueueSize_) { + logger_->log_debug("MQTT queue full"); + return false; + } else { + if (message->payloadlen > maxSegSize_) + message->payloadlen = maxSegSize_; + queue_.enqueue(message); + logger_->log_debug("enqueue MQTT message length %d", message->payloadlen); + return true; + } +} + +void ConsumeMQTT::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { + AbstractMQTTProcessor::onSchedule(context, sessionFactory); + std::string value; + int64_t valInt; + value = ""; + if (context->getProperty(QueueBufferMaxMessage.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) { + maxQueueSize_ = valInt; + logger_->log_debug("ConsumeMQTT: Queue Max Message [%ll]", maxQueueSize_); + } + value = ""; + if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) { + maxSegSize_ = valInt; + logger_->log_debug("ConsumeMQTT: Max Flow Segment Size [%ll]", maxSegSize_); + } +} + +void ConsumeMQTT::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) { + // reconnect if necessary + reconnect(); + std::deque<MQTTClient_message *> msg_queue; + getReceivedMQTTMsg(msg_queue); + while (!msg_queue.empty()) { + MQTTClient_message *message = msg_queue.front(); + std::shared_ptr<core::FlowFile> processFlowFile = session->create(); + ConsumeMQTT::WriteCallback callback(message); + session->write(processFlowFile, &callback); + if (callback.status_ < 0) { + logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", processFlowFile->getUUIDStr()); + session->remove(processFlowFile); + } else { + session->putAttribute(processFlowFile, MQTT_BROKER_ATTRIBUTE, uri_.c_str()); + session->putAttribute(processFlowFile, MQTT_TOPIC_ATTRIBUTE, topic_.c_str()); + logger_->log_debug("ConsumeMQTT processing success for the flow with UUID %s topic %s", processFlowFile->getUUIDStr(), topic_); + session->transfer(processFlowFile, Success); + } + MQTTClient_freeMessage(&message); + msg_queue.pop_front(); + } +} + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/processors/ConsumeMQTT.h ---------------------------------------------------------------------- diff --git a/extensions/mqtt/processors/ConsumeMQTT.h b/extensions/mqtt/processors/ConsumeMQTT.h new file mode 100644 index 0000000..0b26d42 --- /dev/null +++ b/extensions/mqtt/processors/ConsumeMQTT.h @@ -0,0 +1,125 @@ +/** + * @file ConsumeMQTT.h + * ConsumeMQTT class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __CONSUME_MQTT_H__ +#define __CONSUME_MQTT_H__ + +#include <limits> +#include <deque> +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/Property.h" +#include "core/logging/LoggerConfiguration.h" +#include "concurrentqueue.h" +#include "MQTTClient.h" +#include "AbstractMQTTProcessor.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +#define MQTT_TOPIC_ATTRIBUTE "mqtt.topic" +#define MQTT_BROKER_ATTRIBUTE "mqtt.broker" + +// ConsumeMQTT Class +class ConsumeMQTT: public processors::AbstractMQTTProcessor { +public: + // Constructor + /*! + * Create a new processor + */ + explicit ConsumeMQTT(std::string name, uuid_t uuid = NULL) + : processors::AbstractMQTTProcessor(name, uuid), logger_(logging::LoggerFactory<ConsumeMQTT>::getLogger()) { + isSubscriber_ = true; + maxQueueSize_ = 100; + maxSegSize_ = ULLONG_MAX; + } + // Destructor + virtual ~ConsumeMQTT() { + MQTTClient_message *message; + while (queue_.try_dequeue(message)) { + MQTTClient_freeMessage(&message); + } + } + // Processor Name + static constexpr char const* ProcessorName = "ConsumeMQTT"; + // Supported Properties + static core::Property MaxFlowSegSize; + static core::Property QueueBufferMaxMessage; + // Nest Callback Class for write stream + class WriteCallback: public OutputStreamCallback { + public: + WriteCallback(MQTTClient_message *message) : + message_(message) { + status_ = 0; + } + MQTTClient_message *message_; + int64_t process(std::shared_ptr<io::BaseStream> stream) { + int64_t len = stream->write(reinterpret_cast<uint8_t*>(message_->payload), message_->payloadlen); + if (len < 0) + status_ = -1; + return len; + } + int status_; + }; + +public: + /** + * Function that's executed when the processor is scheduled. + * @param context process context. + * @param sessionFactory process session factory that is used when creating + * ProcessSession objects. + */ + void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); + // OnTrigger method, implemented by NiFi ConsumeMQTT + virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session); + // Initialize, over write by NiFi ConsumeMQTT + virtual void initialize(void); + virtual bool enqueueReceiveMQTTMsg(MQTTClient_message *message); + +protected: + void getReceivedMQTTMsg(std::deque<MQTTClient_message *> &msg_queue) { + MQTTClient_message *message; + while (queue_.try_dequeue(message)) { + msg_queue.push_back(message); + } + } + +private: + std::shared_ptr<logging::Logger> logger_; + std::mutex mutex_; + uint64_t maxQueueSize_; + uint64_t maxSegSize_; + moodycamel::ConcurrentQueue<MQTTClient_message *> queue_; +}; + +REGISTER_RESOURCE (ConsumeMQTT); + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/processors/ConvertBase.cpp ---------------------------------------------------------------------- diff --git a/extensions/mqtt/processors/ConvertBase.cpp b/extensions/mqtt/processors/ConvertBase.cpp new file mode 100644 index 0000000..10571d4 --- /dev/null +++ b/extensions/mqtt/processors/ConvertBase.cpp @@ -0,0 +1,69 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <stdio.h> +#include <algorithm> +#include <memory> +#include <string> +#include <map> +#include <set> +#include "utils/TimeUtil.h" +#include "utils/StringUtils.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "ConvertBase.h" +#include "PayloadSerializer.h" +#include "utils/ByteArrayCallback.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property ConvertBase::MQTTControllerService("MQTT Controller Service", "Name of controller service that will be used for MQTT interactivity", ""); +core::Property ConvertBase::ListeningTopic("Listening Topic", "Name of topic to listen to", ""); +core::Relationship ConvertBase::Success("success", "All files are routed to success"); + +void ConvertBase::initialize() { + // Set the supported properties + std::set<core::Property> properties; + properties.insert(MQTTControllerService); + properties.insert(ListeningTopic); + setSupportedProperties(properties); + // Set the supported relationships + std::set<core::Relationship> relationships; + relationships.insert(Success); + setSupportedRelationships(relationships); +} + +void ConvertBase::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) { + std::string controller_service_name = ""; + if (context->getProperty(MQTTControllerService.getName(), controller_service_name) && !controller_service_name.empty()) { + auto service = context->getControllerService(controller_service_name); + mqtt_service_ = std::static_pointer_cast<controllers::MQTTControllerService>(service); + } + context->getProperty(ListeningTopic.getName(), listening_topic); + if (!listening_topic.empty()) { + mqtt_service_->subscribeToTopic(listening_topic); + } +} + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/processors/ConvertBase.h ---------------------------------------------------------------------- diff --git a/extensions/mqtt/processors/ConvertBase.h b/extensions/mqtt/processors/ConvertBase.h new file mode 100644 index 0000000..6f4a41e --- /dev/null +++ b/extensions/mqtt/processors/ConvertBase.h @@ -0,0 +1,90 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef EXTENSIONS_MQTT_PROTOCOL_CONVERTBASE_H_ +#define EXTENSIONS_MQTT_PROTOCOL_CONVERTBASE_H_ + +#include "MQTTControllerService.h" +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/Property.h" +#include "core/logging/LoggerConfiguration.h" +#include "MQTTClient.h" +#include "c2/protocols/RESTProtocol.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +/** + * Purpose: Provides base functionality for mqtt conversion classes. + * At a minimum we need a controller service and listening topic. + */ +class ConvertBase : public core::Processor, public minifi::c2::RESTProtocol { + public: + // Constructor + /*! + * Create a new processor + */ + explicit ConvertBase(std::string name, uuid_t uuid = NULL) + : core::Processor(name, uuid) { + } + // Destructor + virtual ~ConvertBase() { + } + // Supported Properties + static core::Property MQTTControllerService; + static core::Property ListeningTopic; + + static core::Relationship Success; + + public: + + /** + * Initialization of the processor + */ + virtual void initialize() override; + /** + * Function that's executed when the processor is scheduled. + * @param context process context. + * @param sessionFactory process session factory that is used when creating + * ProcessSession objects. + */ + virtual void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override; + protected: + + /** + * MQTT controller service. + */ + std::shared_ptr<controllers::MQTTControllerService> mqtt_service_; + + std::string listening_topic; + +}; + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* EXTENSIONS_MQTT_PROTOCOL_CONVERTBASE_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/processors/ConvertHeartBeat.cpp ---------------------------------------------------------------------- diff --git a/extensions/mqtt/processors/ConvertHeartBeat.cpp b/extensions/mqtt/processors/ConvertHeartBeat.cpp new file mode 100644 index 0000000..cf5574f --- /dev/null +++ b/extensions/mqtt/processors/ConvertHeartBeat.cpp @@ -0,0 +1,75 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <stdio.h> +#include <algorithm> +#include <memory> +#include <string> +#include <map> +#include <set> +#include "utils/TimeUtil.h" +#include "utils/StringUtils.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "ConvertHeartBeat.h" +#include "PayloadSerializer.h" +#include "utils/ByteArrayCallback.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +void ConvertHeartBeat::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) { + auto ff = session.get(); + if (ff != nullptr){ + logger_->log_error("ConvertHeartBeat does not receive flow files"); + session->rollback(); + } + if (nullptr == mqtt_service_) { + context->yield(); + return; + } + std::vector<uint8_t> heartbeat; + bool received_heartbeat = false; + // while we have heartbeats we can continue to loop. + while (mqtt_service_->get(100, listening_topic, heartbeat)) { + if (heartbeat.size() > 0) { + c2::C2Payload payload = c2::mqtt::PayloadSerializer::deserialize(heartbeat); + auto serialized = serializeJsonRootPayload(payload); + logger_->log_debug("Converted JSON output %s", serialized); + minifi::utils::StreamOutputCallback byteCallback(serialized.size() + 1); + byteCallback.write(const_cast<char*>(serialized.c_str()), serialized.size()); + auto newff = session->create(); + session->write(newff, &byteCallback); + session->transfer(newff, Success); + received_heartbeat = true; + } else { + break; + } + } + if (!received_heartbeat) { + context->yield(); + } + +} + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/processors/ConvertHeartBeat.h ---------------------------------------------------------------------- diff --git a/extensions/mqtt/processors/ConvertHeartBeat.h b/extensions/mqtt/processors/ConvertHeartBeat.h new file mode 100644 index 0000000..ad7f37a --- /dev/null +++ b/extensions/mqtt/processors/ConvertHeartBeat.h @@ -0,0 +1,78 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __CONVERT_HEARTBEAT_H__ +#define __CONVERT_HEARTBEAT_H__ + +#include "MQTTControllerService.h" +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/Property.h" +#include "core/logging/LoggerConfiguration.h" +#include "MQTTClient.h" +#include "c2/protocols/RESTProtocol.h" +#include "ConvertBase.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +/* + * Purpose: ConvertHeartBeat converts heatbeats into MQTT messages. + */ +class ConvertHeartBeat: public ConvertBase{ +public: + // Constructor + /*! + * Create a new processor + */ + explicit ConvertHeartBeat(std::string name, uuid_t uuid = NULL) + : ConvertBase(name, uuid), logger_(logging::LoggerFactory<ConvertHeartBeat>::getLogger()) { + } + // Destructor + virtual ~ConvertHeartBeat() { + } + // Processor Name + static constexpr char const* ProcessorName = "ConvertHeartBeat"; + +public: + /** + * Function that's executed when the processor is triggered. + * @param context process context. + * @param sessionFactory process session factory that is used when creating + * ProcessSession objects. + */ + + virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override; + +private: + std::shared_ptr<logging::Logger> logger_; +}; + + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/processors/ConvertJSONAck.cpp ---------------------------------------------------------------------- diff --git a/extensions/mqtt/processors/ConvertJSONAck.cpp b/extensions/mqtt/processors/ConvertJSONAck.cpp new file mode 100644 index 0000000..6ea3eaa --- /dev/null +++ b/extensions/mqtt/processors/ConvertJSONAck.cpp @@ -0,0 +1,115 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "ConvertJSONAck.h" + +#include <stdio.h> +#include <algorithm> +#include <memory> +#include <string> +#include <map> +#include <set> +#include "utils/TimeUtil.h" +#include "utils/StringUtils.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "PayloadSerializer.h" +#include "utils/ByteArrayCallback.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + + +std::string ConvertJSONAck::parseTopicName(const std::string &json) { + std::string topic; + rapidjson::Document root; + + try { + rapidjson::ParseResult ok = root.Parse(json.c_str()); + if (ok) { + if (root.HasMember("agentInfo")) { + if (root["agentInfo"].HasMember("identifier")) { + std::stringstream topicStr; + topicStr << root["agentInfo"]["identifier"].GetString() << "/in"; + return topicStr.str(); + } + } + } + } catch (...) { + + } + return topic; +} +void ConvertJSONAck::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) { + if (nullptr == mqtt_service_) { + context->yield(); + return; + } + auto flow = session->get(); + + if (!flow) { + return; + } + + /** + * This processor expects a JSON response from InvokeHTTP and thus we expect a heartbeat ack following that. + * Since we are trailing InvokeHTTP + */ + std::string topic; + { + // expect JSON response from InvokeHTTP and thus we expect a heartbeat and then the output from the HTTP + c2::C2Payload response_payload(c2::Operation::HEARTBEAT, state::UpdateState::READ_COMPLETE, true, true); + ReadCallback callback; + session->read(flow, &callback); + + topic = parseTopicName(std::string(callback.buffer_.data(), callback.buffer_.size())); + + session->transfer(flow, Success); + + } + flow = session->get(); + + if (!flow) { + return; + } + + if (!topic.empty()) { + ReadCallback callback; + session->read(flow, &callback); + + c2::C2Payload response_payload(c2::Operation::HEARTBEAT, state::UpdateState::READ_COMPLETE, true, true); + + std::string str(callback.buffer_.data(),callback.buffer_.size()); + auto payload = parseJsonResponse(response_payload, callback.buffer_); + + auto stream = c2::mqtt::PayloadSerializer::serialize(payload); + + mqtt_service_->send(topic, stream->getBuffer(), stream->getSize()); + + } + + session->transfer(flow, Success); + +} + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/processors/ConvertJSONAck.h ---------------------------------------------------------------------- diff --git a/extensions/mqtt/processors/ConvertJSONAck.h b/extensions/mqtt/processors/ConvertJSONAck.h new file mode 100644 index 0000000..e7331c8 --- /dev/null +++ b/extensions/mqtt/processors/ConvertJSONAck.h @@ -0,0 +1,105 @@ +/** + * @file ConvertAck.h + * ConvertAck class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __CONVERT_ACKNOWLEDGEMENT_H__ +#define __CONVERT_ACKNOWLEDGEMENT_H__ + +#include "MQTTControllerService.h" +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/Property.h" +#include "core/logging/LoggerConfiguration.h" +#include "MQTTClient.h" +#include "c2/protocols/RESTProtocol.h" +#include "ConvertBase.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +/** + * Purpose: Converts JSON acks into an MQTT consumable by + * MQTTC2Protocol. + */ +class ConvertJSONAck : public ConvertBase { + public: + // Constructor + /*! + * Create a new processor + */ + explicit ConvertJSONAck(std::string name, uuid_t uuid = NULL) + : ConvertBase(name, uuid), + logger_(logging::LoggerFactory<ConvertJSONAck>::getLogger()) { + } + // Destructor + virtual ~ConvertJSONAck() { + } + // Processor Name + static constexpr char const* ProcessorName = "ConvertJSONAck"; + + + public: + /** + * Function that's executed when the processor is scheduled. + * @param context process context. + * @param sessionFactory process session factory that is used when creating + * ProcessSession objects. + */ + + virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override; + + protected: + + class ReadCallback : public InputStreamCallback { + public: + ReadCallback() { + } + ~ReadCallback() { + } + int64_t process(std::shared_ptr<io::BaseStream> stream) { + int64_t ret = 0; + if (nullptr == stream) + return 0; + buffer_.resize(stream->getSize()); + ret = stream->read(reinterpret_cast<uint8_t*>(buffer_.data()), stream->getSize()); + return ret; + } + std::vector<char> buffer_; + }; + + /** + * Parse Topic name from the json -- given a known structure that we expect. + * @param json json representation defined by the restful protocol + */ + std::string parseTopicName(const std::string &json); + private: + std::shared_ptr<logging::Logger> logger_; +}; + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/processors/ConvertUpdate.cpp ---------------------------------------------------------------------- diff --git a/extensions/mqtt/processors/ConvertUpdate.cpp b/extensions/mqtt/processors/ConvertUpdate.cpp new file mode 100644 index 0000000..264a4e0 --- /dev/null +++ b/extensions/mqtt/processors/ConvertUpdate.cpp @@ -0,0 +1,106 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "ConvertUpdate.h" +#include "utils/HTTPClient.h" +#include "io/BaseStream.h" +#include "io/DataStream.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property ConvertUpdate::SSLContext("SSL Context Service", "The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.", ""); + +void ConvertUpdate::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) { + if (nullptr == mqtt_service_) { + context->yield(); + return; + } + std::vector<uint8_t> update; + bool received_update = false; + while (mqtt_service_->get(100, listening_topic, update)) { + // first we have the input topic string followed by the update URI + if (update.size() > 0) { + + io::DataStream dataStream(update.data(), update.size()); + io::BaseStream stream(&dataStream); + + std::string returnTopic, url; + + if (returnTopic.empty() || url.empty()) { + logger_->log_debug("topic and/or URL are empty"); + break; + } + + stream.readUTF(returnTopic); + stream.readUTF(url); + + /** + * Not having curl support is actually okay for MQTT to be built, but running the update processor requires + * that we have curl available. + */ + auto client_ptr = core::ClassLoader::getDefaultClassLoader().instantiateRaw("HTTPClient", "HTTPClient"); + if (nullptr == client_ptr) { + logger_->log_error("Could not locate HTTPClient. You do not have cURL support!"); + return; + } + std::unique_ptr<utils::BaseHTTPClient> client = std::unique_ptr<utils::BaseHTTPClient>(dynamic_cast<utils::BaseHTTPClient*>(client_ptr)); + client->initialize("GET"); + client->setConnectionTimeout(2000); + client->setReadTimeout(2000); + + if (client->submit()) { + auto data = client->getResponseBody(); + std::vector<uint8_t> raw_data; + std::transform(std::begin(data), std::end(data), std::back_inserter(raw_data), [](char c) { + return (uint8_t)c; + }); + mqtt_service_->send(returnTopic, raw_data); + } + + received_update = true; + } else { + break; + } + } + + if (!received_update) { + context->yield(); + } + +} + +void ConvertUpdate::initialize() { + // Set the supported properties + std::set<core::Property> properties; + properties.insert(MQTTControllerService); + properties.insert(ListeningTopic); + properties.insert(SSLContext); + setSupportedProperties(properties); + // Set the supported relationships + std::set<core::Relationship> relationships; + relationships.insert(Success); + setSupportedRelationships(relationships); +} + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/processors/ConvertUpdate.h ---------------------------------------------------------------------- diff --git a/extensions/mqtt/processors/ConvertUpdate.h b/extensions/mqtt/processors/ConvertUpdate.h new file mode 100644 index 0000000..a04529a --- /dev/null +++ b/extensions/mqtt/processors/ConvertUpdate.h @@ -0,0 +1,91 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef EXTENSIONS_MQTT_PROTOCOL_CONVERTUPDATE_H_ +#define EXTENSIONS_MQTT_PROTOCOL_CONVERTUPDATE_H_ + + +#include "MQTTControllerService.h" +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/Property.h" +#include "core/logging/LoggerConfiguration.h" +#include "MQTTClient.h" +#include "c2/protocols/RESTProtocol.h" +#include "ConvertBase.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +/** + * Purpose: Converts update messages into the appropriate Restful call + * + * Justification: The other protocol classes are responsible for standard messaging, which carries most + * heartbeat related activity; however, updates generally connect to a different source. This processor will + * retrieve the updates and respond via MQTT. + */ +class ConvertUpdate : public ConvertBase { + public: + // Constructor + /*! + * Create a new processor + */ + explicit ConvertUpdate(std::string name, uuid_t uuid = NULL) + : ConvertBase(name, uuid), logger_(logging::LoggerFactory<ConvertUpdate>::getLogger()) { + } + // Destructor + virtual ~ConvertUpdate() { + } + + static core::Property SSLContext; + // Processor Name + static constexpr char const* ProcessorName = "ConvertUpdate"; + +public: + + /** + * Initialization of the processor + */ + virtual void initialize() override; + /** + * Function that's executed when the processor is triggered. + * @param context process context. + * @param sessionFactory process session factory that is used when creating + * ProcessSession objects. + */ + + virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override; + +protected: + std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_; +private: + std::shared_ptr<logging::Logger> logger_; +}; + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* EXTENSIONS_MQTT_PROTOCOL_CONVERTUPDATE_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/processors/PublishMQTT.cpp ---------------------------------------------------------------------- diff --git a/extensions/mqtt/processors/PublishMQTT.cpp b/extensions/mqtt/processors/PublishMQTT.cpp new file mode 100644 index 0000000..411cc2d --- /dev/null +++ b/extensions/mqtt/processors/PublishMQTT.cpp @@ -0,0 +1,106 @@ +/** + * @file PublishMQTT.cpp + * PublishMQTT class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "PublishMQTT.h" +#include <stdio.h> +#include <algorithm> +#include <memory> +#include <string> +#include <map> +#include <set> +#include "utils/TimeUtil.h" +#include "utils/StringUtils.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property PublishMQTT::Retain("Retain", "Retain MQTT published record in broker", "false"); +core::Property PublishMQTT::MaxFlowSegSize("Max Flow Segment Size", "Maximum flow content payload segment size for the MQTT record", ""); + +void PublishMQTT::initialize() { + // Set the supported properties + std::set<core::Property> properties; + properties.insert(BrokerURL); + properties.insert(CleanSession); + properties.insert(ClientID); + properties.insert(UserName); + properties.insert(PassWord); + properties.insert(KeepLiveInterval); + properties.insert(ConnectionTimeOut); + properties.insert(QOS); + properties.insert(Topic); + properties.insert(Retain); + properties.insert(MaxFlowSegSize); + setSupportedProperties(properties); + // Set the supported relationships + std::set<core::Relationship> relationships; + relationships.insert(Success); + relationships.insert(Failure); + setSupportedRelationships(relationships); +} + +void PublishMQTT::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { + AbstractMQTTProcessor::onSchedule(context, sessionFactory); + std::string value; + int64_t valInt; + value = ""; + if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) { + max_seg_size_ = valInt; + logger_->log_debug("PublishMQTT: max flow segment size [%ll]", max_seg_size_); + } + value = ""; + if (context->getProperty(Retain.getName(), value) && !value.empty() && org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, retain_)) { + logger_->log_debug("PublishMQTT: Retain [%d]", retain_); + } +} + +void PublishMQTT::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) { + std::shared_ptr<core::FlowFile> flowFile = session->get(); + + if (!flowFile) { + return; + } + + if (!reconnect()) { + logger_->log_error("MQTT connect to %s failed", uri_); + session->transfer(flowFile, Failure); + return; + } + + PublishMQTT::ReadCallback callback(flowFile->getSize(), max_seg_size_, topic_, client_, qos_, retain_, delivered_token_); + session->read(flowFile, &callback); + if (callback.status_ < 0) { + logger_->log_error("Failed to send flow to MQTT topic %s", topic_); + session->transfer(flowFile, Failure); + } else { + logger_->log_debug("Sent flow with length %d to MQTT topic %s", callback.read_size_, topic_); + session->transfer(flowFile, Success); + } +} + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/processors/PublishMQTT.h ---------------------------------------------------------------------- diff --git a/extensions/mqtt/processors/PublishMQTT.h b/extensions/mqtt/processors/PublishMQTT.h new file mode 100644 index 0000000..67a0d7f --- /dev/null +++ b/extensions/mqtt/processors/PublishMQTT.h @@ -0,0 +1,142 @@ +/** + * @file PublishMQTT.h + * PublishMQTT class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __PUBLISH_MQTT_H__ +#define __PUBLISH_MQTT_H__ + +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/Property.h" +#include "core/logging/LoggerConfiguration.h" +#include "MQTTClient.h" +#include "AbstractMQTTProcessor.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// PublishMQTT Class +class PublishMQTT: public processors::AbstractMQTTProcessor { +public: + // Constructor + /*! + * Create a new processor + */ + explicit PublishMQTT(std::string name, uuid_t uuid = NULL) + : processors::AbstractMQTTProcessor(name, uuid), logger_(logging::LoggerFactory<PublishMQTT>::getLogger()) { + retain_ = false; + max_seg_size_ = ULLONG_MAX; + } + // Destructor + virtual ~PublishMQTT() { + } + // Processor Name + static constexpr char const* ProcessorName = "PublishMQTT"; + // Supported Properties + static core::Property Retain; + static core::Property MaxFlowSegSize; + + // Nest Callback Class for read stream + class ReadCallback: public InputStreamCallback { + public: + ReadCallback(uint64_t flow_size, uint64_t max_seg_size, const std::string &key, MQTTClient client, + int qos, bool retain, MQTTClient_deliveryToken &token) : + flow_size_(flow_size), max_seg_size_(max_seg_size), key_(key), client_(client), + qos_(qos), retain_(retain), token_(token) { + status_ = 0; + read_size_ = 0; + } + ~ReadCallback() { + } + int64_t process(std::shared_ptr<io::BaseStream> stream) { + if (flow_size_ < max_seg_size_) + max_seg_size_ = flow_size_; + std::vector<unsigned char> buffer; + buffer.reserve(max_seg_size_); + read_size_ = 0; + status_ = 0; + while (read_size_ < flow_size_) { + int readRet = stream->read(&buffer[0], max_seg_size_); + if (readRet < 0) { + status_ = -1; + return read_size_; + } + if (readRet > 0) { + MQTTClient_message pubmsg = MQTTClient_message_initializer; + pubmsg.payload = &buffer[0]; + pubmsg.payloadlen = readRet; + pubmsg.qos = qos_; + pubmsg.retained = retain_; + if (MQTTClient_publishMessage(client_, key_.c_str(), &pubmsg, &token_) != MQTTCLIENT_SUCCESS) { + status_ = -1; + return -1; + } + read_size_ += readRet; + } else { + break; + } + } + return read_size_; + } + uint64_t flow_size_; + uint64_t max_seg_size_; + std::string key_; + MQTTClient client_;; + int status_; + size_t read_size_; + int qos_; + int retain_; + MQTTClient_deliveryToken &token_; + }; + +public: + /** + * Function that's executed when the processor is scheduled. + * @param context process context. + * @param sessionFactory process session factory that is used when creating + * ProcessSession objects. + */ + void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); + // OnTrigger method, implemented by NiFi PublishMQTT + virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session); + // Initialize, over write by NiFi PublishMQTT + virtual void initialize(void); + +protected: + +private: + uint64_t max_seg_size_; + bool retain_; + std::shared_ptr<logging::Logger> logger_; +}; + +REGISTER_RESOURCE (PublishMQTT); + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/protocol/MQTTC2Protocol.cpp ---------------------------------------------------------------------- diff --git a/extensions/mqtt/protocol/MQTTC2Protocol.cpp b/extensions/mqtt/protocol/MQTTC2Protocol.cpp new file mode 100644 index 0000000..03a6c8d --- /dev/null +++ b/extensions/mqtt/protocol/MQTTC2Protocol.cpp @@ -0,0 +1,103 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "MQTTC2Protocol.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +MQTTC2Protocol::MQTTC2Protocol(std::string name, uuid_t uuid) + : C2Protocol(name, uuid), + logger_(logging::LoggerFactory<Connectable>::getLogger()) { +} + +MQTTC2Protocol::~MQTTC2Protocol() { +} + +void MQTTC2Protocol::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) { + if (configure->get("nifi.c2.mqtt.connector.service", controller_service_name_)) { + auto service = controller->getControllerService(controller_service_name_); + mqtt_service_ = std::static_pointer_cast<controllers::MQTTControllerService>(service); + } else + mqtt_service_ = nullptr; + + agent_identifier_ = configure->getAgentIdentifier(); + + std::stringstream outputStream; + std::string updateTopicOpt, heartbeatTopicOpt; + if (configure->get("nifi.c2.mqtt.heartbeat.topic", heartbeatTopicOpt)) { + heartbeat_topic_ = heartbeatTopicOpt; + } else { + heartbeat_topic_ = "heartbeats"; // outputStream.str(); + } + if (configure->get("nifi.c2.mqtt.update.topic", updateTopicOpt)) { + update_topic_ = updateTopicOpt; + } else { + update_topic_ = "updates"; + } + + std::stringstream inputStream; + inputStream << agent_identifier_ << "/in"; + in_topic_ = inputStream.str(); + + if (mqtt_service_) { + mqtt_service_->subscribeToTopic(in_topic_); + } +} + +C2Payload MQTTC2Protocol::consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async) { + // we are getting an update. + std::lock_guard<std::mutex> lock(input_mutex_); + io::BaseStream stream; + stream.writeUTF(in_topic_); + stream.writeUTF(url); + std::vector<uint8_t> response; + auto transmit_id = mqtt_service_->send(update_topic_, stream.getBuffer(), stream.getSize()); + if (transmit_id > 0 && mqtt_service_->awaitResponse(5000, transmit_id, in_topic_, response)) { + C2Payload response_payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true, true); + response_payload.setRawData(response); + return response_payload; + } else { + return C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true); + } +} + +C2Payload MQTTC2Protocol::serialize(const C2Payload &payload) { + if (mqtt_service_ == nullptr || !mqtt_service_->isRunning()) { + return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true); + } + + std::lock_guard<std::mutex> lock(input_mutex_); + + auto stream = c2::mqtt::PayloadSerializer::serialize(payload); + + auto transmit_id = mqtt_service_->send(heartbeat_topic_, stream->getBuffer(), stream->getSize()); + std::vector<uint8_t> response; + if (transmit_id > 0 && mqtt_service_->awaitResponse(5000, transmit_id, in_topic_, response)) { + return c2::mqtt::PayloadSerializer::deserialize(response); + } + return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true); +} + +} /* namespace c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/protocol/MQTTC2Protocol.h ---------------------------------------------------------------------- diff --git a/extensions/mqtt/protocol/MQTTC2Protocol.h b/extensions/mqtt/protocol/MQTTC2Protocol.h new file mode 100644 index 0000000..460e5d4 --- /dev/null +++ b/extensions/mqtt/protocol/MQTTC2Protocol.h @@ -0,0 +1,96 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef EXTENSIONS_MQTT_PROTOCOL_MQTTC2PROTOCOL_H_ +#define EXTENSIONS_MQTT_PROTOCOL_MQTTC2PROTOCOL_H_ + +#include <algorithm> +#include <iostream> +#include <memory> +#include <utility> +#include <map> +#include <string> +#include <vector> + +#include "../controllerservice/MQTTControllerService.h" +#include "c2/C2Protocol.h" +#include "io/BaseStream.h" +#include "agent/agent_version.h" +#include "PayloadSerializer.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +/** + * Purpose: Implementation of the MQTT C2 protocol. Serializes messages to and from + * and mqtt server. + */ +class MQTTC2Protocol : public C2Protocol { + public: + explicit MQTTC2Protocol(std::string name, uuid_t uuid = nullptr); + + virtual ~MQTTC2Protocol(); + + /** + * Consume the payload. + * @param url to evaluate. + * @param payload payload to consume. + * @direction direction of operation. + */ + virtual C2Payload consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async) override; + + virtual C2Payload consumePayload(const C2Payload &payload, Direction direction, bool async) override { + return serialize(payload); + } + + virtual void update(const std::shared_ptr<Configure> &configure) override { + // no op. + } + + virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) override; + + protected: + + C2Payload serialize(const C2Payload &payload); + + std::mutex input_mutex_; + // input topic on which we will listen. + std::string in_topic_; + // agent identifier + std::string agent_identifier_; + // heartbeat topic name. + std::string heartbeat_topic_; + // update topic name. + std::string update_topic_; + + // mqtt controller service reference. + std::shared_ptr<controllers::MQTTControllerService> mqtt_service_; + std::shared_ptr<logging::Logger> logger_; + //mqtt controller serviec name. + std::string controller_service_name_; + + +}; +} /* namespace c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ +#endif /* EXTENSIONS_MQTT_PROTOCOL_MQTTC2PROTOCOL_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/protocol/PayloadSerializer.cpp ---------------------------------------------------------------------- diff --git a/extensions/mqtt/protocol/PayloadSerializer.cpp b/extensions/mqtt/protocol/PayloadSerializer.cpp new file mode 100644 index 0000000..5f62227 --- /dev/null +++ b/extensions/mqtt/protocol/PayloadSerializer.cpp @@ -0,0 +1,38 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "PayloadSerializer.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { +namespace mqtt { + +PayloadSerializer::PayloadSerializer() { +} + +PayloadSerializer::~PayloadSerializer() { +} + +} /* namespace mqtt */ +} /* namespace c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/protocol/PayloadSerializer.h ---------------------------------------------------------------------- diff --git a/extensions/mqtt/protocol/PayloadSerializer.h b/extensions/mqtt/protocol/PayloadSerializer.h new file mode 100644 index 0000000..27b3362 --- /dev/null +++ b/extensions/mqtt/protocol/PayloadSerializer.h @@ -0,0 +1,318 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef EXTENSIONS_MQTT_PROTOCOL_PAYLOADSERIALIZER_H_ +#define EXTENSIONS_MQTT_PROTOCOL_PAYLOADSERIALIZER_H_ + +#include "c2/C2Protocol.h" +#include "io/BaseStream.h" +#include "core/state/Value.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { +namespace mqtt { + +class PayloadSerializer { + public: + + /** + * Static function that serializes the value nodes + */ + static void serializeValueNode(state::response::ValueNode &value, std::shared_ptr<io::BaseStream> stream) { + auto base_type = value.getValue(); + uint8_t type = 0x00; + if (auto sub_type = std::dynamic_pointer_cast<state::response::IntValue>(base_type)) { + type = 1; + stream->write(&type, 1); + uint32_t value = sub_type->getValue(); + stream->write(value); + } else if (auto sub_type = std::dynamic_pointer_cast<state::response::Int64Value>(base_type)) { + type = 2; + stream->write(&type, 1); + uint64_t value = sub_type->getValue(); + stream->write(value); + } else if (auto sub_type = std::dynamic_pointer_cast<state::response::BoolValue>(base_type)) { + type = 3; + stream->write(&type, 1); + if (sub_type->getValue()) { + type = 1; + + } else + type = 0; + stream->write(&type, 1); + } else { + auto str = base_type->getStringValue(); + type = 4; + stream->write(&type, 1); + stream->writeUTF(str); + } + } + static void serialize(uint8_t op, const C2Payload &payload, std::shared_ptr<io::BaseStream> stream) { + uint8_t st; + uint32_t size = payload.getNestedPayloads().size(); + stream->write(size); + for (auto nested_payload : payload.getNestedPayloads()) { + op = opToInt(nested_payload.getOperation()); + stream->write(&op, 1); + stream->write(&st, 1); + stream->writeUTF(nested_payload.getLabel()); + stream->writeUTF(nested_payload.getIdentifier()); + const std::vector<C2ContentResponse> &content = nested_payload.getContent(); + size = content.size(); + stream->write(size); + for (const auto &payload_content : content) { + stream->writeUTF(payload_content.name); + size = payload_content.operation_arguments.size(); + stream->write(size); + for (auto content : payload_content.operation_arguments) { + stream->writeUTF(content.first); + serializeValueNode(content.second, stream); + } + } + if (nested_payload.getNestedPayloads().size() > 0) { + serialize(op, nested_payload, stream); + } else { + size = 0; + stream->write(size); + } + } + } + + static uint8_t opToInt(const Operation opt) { + uint8_t op; + + switch (opt) { + case Operation::ACKNOWLEDGE: + op = 1; + break; + case Operation::HEARTBEAT: + op = 2; + break; + case Operation::RESTART: + op = 3; + break; + case Operation::DESCRIBE: + op = 4; + break; + case Operation::STOP: + op = 5; + break; + case Operation::START: + op = 6; + break; + case Operation::UPDATE: + op = 7; + break; + default: + op = 2; + break; + } + return op; + } + static std::shared_ptr<io::BaseStream> serialize(const C2Payload &payload) { + std::shared_ptr<io::BaseStream> stream = std::make_shared<io::BaseStream>(); + uint8_t op, st = 0; + op = opToInt(payload.getOperation()); + stream->write(&op, 1); + if (payload.getStatus().getState() == state::UpdateState::NESTED) { + st = 1; + stream->write(&st, 1); + } else { + st = 0; + stream->write(&st, 1); + } + stream->writeUTF(payload.getLabel()); + + stream->writeUTF(payload.getIdentifier()); + const std::vector<C2ContentResponse> &content = payload.getContent(); + uint32_t size = content.size(); + stream->write(size); + for (const auto &payload_content : content) { + stream->writeUTF(payload_content.name); + size = payload_content.operation_arguments.size(); + stream->write(size); + for (auto content : payload_content.operation_arguments) { + stream->writeUTF(content.first); + serializeValueNode(content.second, stream); + } + } + serialize(op, payload, stream); + return stream; + } + + static state::response::ValueNode deserializeValueNode(io::BaseStream *stream) { + uint8_t type = 0; + stream->read(&type, 1); + state::response::ValueNode node; + switch (type) { + case 1: + uint32_t thb; + stream->read(thb); + node = thb; + break; + case 2: + uint64_t base; + stream->read(base); + node = base; + break; + case 3: + stream->read(&type, 1); + if (type == 1) + node = true; + else + node = false; + break; + default: + case 4: + std::string str; + stream->readUTF(str); + node = str; + } + return node; + } + static C2Payload deserialize(std::vector<uint8_t> data) { + C2Payload payload(Operation::HEARTBEAT, state::UpdateState::READ_COMPLETE, true); + if (deserialize(data, payload)) { + return payload; + } + return C2Payload(Operation::HEARTBEAT, state::UpdateState::READ_ERROR, true); + } + /** + * Deserializes the payloads + * @param parent payload to deserialize. + * @param operation of parent payload + * @param identifier for this payload + * @param stream base stream in which we will serialize the parent payload. + */ + static bool deserializePayload(C2Payload &parent, Operation operation, std::string identifier, io::BaseStream *stream) { + uint32_t payloads = 0; + stream->read(payloads); + uint8_t op, st; + std::string label; + for (size_t i = 0; i < payloads; i++) { + stream->read(op); + stream->read(st); + stream->readUTF(label); + stream->readUTF(identifier); + operation = intToOp(op); + C2Payload subPayload(operation, st == 1 ? state::UpdateState::NESTED : state::UpdateState::READ_COMPLETE); + subPayload.setIdentifier(identifier); + subPayload.setLabel(label); + uint32_t content_size = 0; + stream->read(content_size); + for (uint32_t i = 0; i < content_size; i++) { + std::string content_name; + uint32_t args = 0; + C2ContentResponse content(operation); + stream->readUTF(content_name); + content.name = content_name; + stream->read(args); + for (uint32_t j = 0; j < args; j++) { + std::string first, second; + stream->readUTF(first); + content.operation_arguments[first] = deserializeValueNode(stream); + } + subPayload.addContent(std::move(content)); + } + deserializePayload(subPayload, operation, identifier, stream); + parent.addPayload(std::move(subPayload)); + + } + return true; + } + static bool deserialize(std::vector<uint8_t> data, C2Payload &payload) { + io::DataStream dataStream(data.data(), data.size()); + io::BaseStream stream(&dataStream); + + uint8_t op, st = 0; + ; + + std::string identifier, label; + // read op + stream.read(op); + stream.read(st); + stream.readUTF(label); + stream.readUTF(identifier); + + Operation operation = intToOp(op); + + C2Payload newPayload(operation, st == 1 ? state::UpdateState::NESTED : state::UpdateState::READ_COMPLETE); + newPayload.setIdentifier(identifier); + newPayload.setLabel(label); + + uint32_t content_size = 0; + stream.read(content_size); + for (size_t i = 0; i < content_size; i++) { + std::string content_name; + uint32_t args = 0; + C2ContentResponse content(operation); + stream.readUTF(content_name); + content.name = content_name; + stream.read(args); + for (uint32_t j = 0; j < args; j++) { + std::string first, second; + stream.readUTF(first); + //stream.readUTF(second); + content.operation_arguments[first] = deserializeValueNode(&stream); + } + newPayload.addContent(std::move(content)); + } + + deserializePayload(newPayload, operation, identifier, &stream); + // we're finished + payload = std::move(newPayload); + return true; + } + private: + + static Operation intToOp(int op) { + switch (op) { + case 1: + return Operation::ACKNOWLEDGE; + case 2: + return Operation::HEARTBEAT; + case 3: + return Operation::RESTART; + case 4: + return Operation::DESCRIBE; + case 5: + return Operation::STOP; + case 6: + return Operation::START; + case 7: + return Operation::UPDATE; + default: + return Operation::HEARTBEAT; + ; + } + + } + PayloadSerializer(); + virtual ~PayloadSerializer(); +}; + +} /* namespace mqtt */ +} /* namespace c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* EXTENSIONS_MQTT_PROTOCOL_PAYLOADSERIALIZER_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/rocksdb-repos/DatabaseContentRepository.h ---------------------------------------------------------------------- diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h b/extensions/rocksdb-repos/DatabaseContentRepository.h index d469569..49e7227 100644 --- a/extensions/rocksdb-repos/DatabaseContentRepository.h +++ b/extensions/rocksdb-repos/DatabaseContentRepository.h @@ -41,7 +41,9 @@ class StringAppender : public rocksdb::AssociativeMergeOperator { virtual bool Merge(const rocksdb::Slice& key, const rocksdb::Slice* existing_value, const rocksdb::Slice& value, std::string* new_value, rocksdb::Logger* logger) const { // Clear the *new_value for writing. - assert(new_value); + if (nullptr == new_value) { + return false; + } new_value->clear(); if (!existing_value) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/libminifi/include/c2/C2Payload.h ---------------------------------------------------------------------- diff --git a/libminifi/include/c2/C2Payload.h b/libminifi/include/c2/C2Payload.h index 4d4b0ad..cbe2cb1 100644 --- a/libminifi/include/c2/C2Payload.h +++ b/libminifi/include/c2/C2Payload.h @@ -165,6 +165,11 @@ class C2Payload : public state::Update { void setRawData(const std::vector<char> &data); /** + * Sets raw data from a vector of uint8_t within this object. + */ + void setRawData(const std::vector<uint8_t> &data); + + /** * Returns raw data. */ std::vector<char> getRawData() const; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/libminifi/include/core/controller/ControllerService.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/controller/ControllerService.h b/libminifi/include/core/controller/ControllerService.h index 8f52772..b754aa9 100644 --- a/libminifi/include/core/controller/ControllerService.h +++ b/libminifi/include/core/controller/ControllerService.h @@ -125,8 +125,13 @@ class ControllerService : public ConfigurableComponent, public Connectable { return false; } + void setLinkedControllerServices( const std::vector<std::shared_ptr<controller::ControllerService> > &services ){ + linked_services_ = services; + } + protected: + std::vector<std::shared_ptr<controller::ControllerService> > linked_services_; std::shared_ptr<Configure> configuration_; std::atomic<ControllerServiceState> current_state_; virtual bool canEdit() { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/libminifi/include/core/controller/StandardControllerServiceProvider.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/controller/StandardControllerServiceProvider.h b/libminifi/include/core/controller/StandardControllerServiceProvider.h index 4d70757..678f3f5 100644 --- a/libminifi/include/core/controller/StandardControllerServiceProvider.h +++ b/libminifi/include/core/controller/StandardControllerServiceProvider.h @@ -110,7 +110,7 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ } virtual void enableAllControllerServices() { - logger_->log_info("Enabling %ll controller services", controller_map_->getAllControllerServices().size()); + logger_->log_info("Enabling %u controller services", controller_map_->getAllControllerServices().size()); for (auto service : controller_map_->getAllControllerServices()) { if (service->canEnable()) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/libminifi/include/properties/Configure.h ---------------------------------------------------------------------- diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h index 66a9351..3646967 100644 --- a/libminifi/include/properties/Configure.h +++ b/libminifi/include/properties/Configure.h @@ -20,6 +20,7 @@ #ifndef __CONFIGURE_H__ #define __CONFIGURE_H__ +#include <mutex> #include "properties/Properties.h" namespace org { @@ -29,6 +30,15 @@ namespace minifi { class Configure : public Properties { public: + + void setAgentIdentifier(const std::string &identifier) { + std::lock_guard<std::mutex> lock(mutex_); + agent_identifier_ = identifier; + } + std::string getAgentIdentifier() { + std::lock_guard<std::mutex> lock(mutex_); + return agent_identifier_; + } // nifi.flow.configuration.file static const char *nifi_default_directory; static const char *nifi_c2_enable; @@ -70,6 +80,10 @@ class Configure : public Properties { // nifi rest api user name and password static const char *nifi_rest_api_user_name; static const char *nifi_rest_api_password; + + private: + std::string agent_identifier_; + std::mutex mutex_; }; } /* namespace minifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/libminifi/include/utils/ByteArrayCallback.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/ByteArrayCallback.h b/libminifi/include/utils/ByteArrayCallback.h index 653159c..49249a7 100644 --- a/libminifi/include/utils/ByteArrayCallback.h +++ b/libminifi/include/utils/ByteArrayCallback.h @@ -92,9 +92,9 @@ class ByteOutputCallback : public OutputStreamCallback { public: ByteOutputCallback() = delete; - explicit ByteOutputCallback(size_t max_size, bool wait_on_read=false) + explicit ByteOutputCallback(size_t max_size, bool wait_on_read = false) : max_size_(max_size), - read_started_( wait_on_read ? false : true ), + read_started_(wait_on_read ? false : true), logger_(logging::LoggerFactory<ByteOutputCallback>::getLogger()) { current_str_pos = 0; size_ = 0; @@ -149,6 +149,18 @@ class ByteOutputCallback : public OutputStreamCallback { }; +class StreamOutputCallback : public ByteOutputCallback { + public: + explicit StreamOutputCallback(size_t max_size, bool wait_on_read = false) + : ByteOutputCallback(max_size, wait_on_read) { + + } + + virtual void write(char *data, size_t size); + + virtual int64_t process(std::shared_ptr<io::BaseStream> stream); +}; + } /* namespace utils */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/libminifi/src/FlowController.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index 1414765..688c276 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -360,6 +360,13 @@ void FlowController::initializeC2() { } else { c2_enabled_ = true; } + + std::string identifier_str; + if (!configuration_->get("nifi.c2.agent.identifier", identifier_str) || identifier_str.empty()) { + // set to the flow controller's identifier + identifier_str = uuidStr_; + } + configuration_->setAgentIdentifier(identifier_str); state::StateManager::initialize(); std::shared_ptr<c2::C2Agent> agent = std::make_shared<c2::C2Agent>(std::dynamic_pointer_cast<FlowController>(shared_from_this()), std::dynamic_pointer_cast<FlowController>(shared_from_this()), @@ -409,13 +416,8 @@ void FlowController::initializeC2() { auto identifier = std::dynamic_pointer_cast<state::response::AgentIdentifier>(processor); if (identifier != nullptr) { - std::string identifier_str; - if (configuration_->get("nifi.c2.agent.identifier", identifier_str) && !identifier_str.empty()) { - identifier->setIdentifier(identifier_str); - } else { - // set to the flow controller's identifier - identifier->setIdentifier(uuidStr_); - } + + identifier->setIdentifier(identifier_str); std::string class_str; if (configuration_->get("nifi.c2.agent.class", class_str) && !class_str.empty()) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/libminifi/src/c2/C2Agent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp index 077aefe..b6d2825 100644 --- a/libminifi/src/c2/C2Agent.cpp +++ b/libminifi/src/c2/C2Agent.cpp @@ -108,6 +108,7 @@ void C2Agent::configure(const std::shared_ptr<Configure> &configure, bool reconf auto protocol = core::ClassLoader::getDefaultClassLoader().instantiateRaw(clazz, clazz); if (protocol == nullptr) { + logger_->log_info("Class %s not found", clazz); protocol = core::ClassLoader::getDefaultClassLoader().instantiateRaw("RESTSender", "RESTSender"); if (!protocol) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/libminifi/src/c2/C2Payload.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/c2/C2Payload.cpp b/libminifi/src/c2/C2Payload.cpp index 3807b12..cec2a7e 100644 --- a/libminifi/src/c2/C2Payload.cpp +++ b/libminifi/src/c2/C2Payload.cpp @@ -178,6 +178,13 @@ void C2Payload::setRawData(const std::vector<char> &data) { raw_data_.insert(std::end(raw_data_), std::begin(data), std::end(data)); } +void C2Payload::setRawData(const std::vector<uint8_t> &data) { + std::transform(std::begin(data), std::end(data), std::back_inserter(raw_data_),[](uint8_t c){ + return (char)c; + }); +} + + std::vector<char> C2Payload::getRawData() const { return raw_data_; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/libminifi/src/core/ProcessSession.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index fde2bc6..9b732dc 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -830,7 +830,7 @@ std::shared_ptr<core::FlowFile> ProcessSession::get() { std::shared_ptr<Connectable> first = process_context_->getProcessorNode()->getNextIncomingConnection(); if (first == NULL) { - logger_->log_debug("Get is null for %s", process_context_->getProcessorNode()->getName()); + logger_->log_trace("Get is null for %s", process_context_->getProcessorNode()->getName()); return NULL; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/libminifi/src/core/controller/StandardControllerServiceNode.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/controller/StandardControllerServiceNode.cpp b/libminifi/src/core/controller/StandardControllerServiceNode.cpp index 5c4aa70..ae043da 100644 --- a/libminifi/src/core/controller/StandardControllerServiceNode.cpp +++ b/libminifi/src/core/controller/StandardControllerServiceNode.cpp @@ -53,6 +53,12 @@ bool StandardControllerServiceNode::enable() { } std::shared_ptr<ControllerService> impl = getControllerServiceImplementation(); if (nullptr != impl) { + std::lock_guard<std::mutex> lock(mutex_); + std::vector<std::shared_ptr<ControllerService> > services; + for(auto service : linked_controller_services_ ){ + services.push_back ( service->getControllerServiceImplementation()); + } + impl->setLinkedControllerServices( services ); impl->onEnable(); } return true; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/libminifi/src/utils/ByteArrayCallback.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/utils/ByteArrayCallback.cpp b/libminifi/src/utils/ByteArrayCallback.cpp index ed9ffb7..b0c0dbc 100644 --- a/libminifi/src/utils/ByteArrayCallback.cpp +++ b/libminifi/src/utils/ByteArrayCallback.cpp @@ -37,6 +37,21 @@ int64_t ByteOutputCallback::process(std::shared_ptr<io::BaseStream> stream) { return size_.load(); } +int64_t StreamOutputCallback::process(std::shared_ptr<io::BaseStream> stream) { + stream->seek(0); + std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[size_.load()]); + auto written = readFully(buffer.get(), size_); + stream->writeData(reinterpret_cast<uint8_t*>(buffer.get()), written); + return stream->getSize(); + +} + +void StreamOutputCallback::write(char *data, size_t size) { + if (!is_alive_) + return; + write_and_notify(data, size); +} + const std::vector<char> ByteOutputCallback::to_string() { std::vector<char> buffer; buffer.resize(size_.load());
