Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 30df9a5ef -> 8dd7e91f9


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/processors/ConsumeMQTT.cpp
----------------------------------------------------------------------
diff --git a/extensions/mqtt/processors/ConsumeMQTT.cpp 
b/extensions/mqtt/processors/ConsumeMQTT.cpp
new file mode 100644
index 0000000..472d35f
--- /dev/null
+++ b/extensions/mqtt/processors/ConsumeMQTT.cpp
@@ -0,0 +1,119 @@
+/**
+ * @file ConsumeMQTT.cpp
+ * ConsumeMQTT class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ConsumeMQTT.h"
+#include <stdio.h>
+#include <algorithm>
+#include <memory>
+#include <string>
+#include <map>
+#include <set>
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property ConsumeMQTT::MaxFlowSegSize("Max Flow Segment Size", "Maximum 
flow content payload segment size for the MQTT record", "");
+core::Property ConsumeMQTT::QueueBufferMaxMessage("Queue Max Message", 
"Maximum number of messages allowed on the received MQTT queue", "");
+
+void ConsumeMQTT::initialize() {
+  // Set the supported properties
+  std::set<core::Property> properties;
+  properties.insert(BrokerURL);
+  properties.insert(CleanSession);
+  properties.insert(ClientID);
+  properties.insert(UserName);
+  properties.insert(PassWord);
+  properties.insert(KeepLiveInterval);
+  properties.insert(ConnectionTimeOut);
+  properties.insert(QOS);
+  properties.insert(Topic);
+  properties.insert(MaxFlowSegSize);
+  properties.insert(QueueBufferMaxMessage);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set<core::Relationship> relationships;
+  relationships.insert(Success);
+  setSupportedRelationships(relationships);
+}
+
+bool ConsumeMQTT::enqueueReceiveMQTTMsg(MQTTClient_message *message) {
+  if (queue_.size_approx() >= maxQueueSize_) {
+    logger_->log_debug("MQTT queue full");
+    return false;
+  } else {
+    if (message->payloadlen > maxSegSize_)
+      message->payloadlen = maxSegSize_;
+    queue_.enqueue(message);
+    logger_->log_debug("enqueue MQTT message length %d", message->payloadlen);
+    return true;
+  }
+}
+
+void ConsumeMQTT::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
+  AbstractMQTTProcessor::onSchedule(context, sessionFactory);
+  std::string value;
+  int64_t valInt;
+  value = "";
+  if (context->getProperty(QueueBufferMaxMessage.getName(), value) && 
!value.empty() && core::Property::StringToInt(value, valInt)) {
+    maxQueueSize_ = valInt;
+    logger_->log_debug("ConsumeMQTT: Queue Max Message [%ll]", maxQueueSize_);
+  }
+  value = "";
+  if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() 
&& core::Property::StringToInt(value, valInt)) {
+    maxSegSize_ = valInt;
+    logger_->log_debug("ConsumeMQTT: Max Flow Segment Size [%ll]", 
maxSegSize_);
+  }
+}
+
+void ConsumeMQTT::onTrigger(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSession> &session) {
+  // reconnect if necessary
+  reconnect();
+  std::deque<MQTTClient_message *> msg_queue;
+  getReceivedMQTTMsg(msg_queue);
+  while (!msg_queue.empty()) {
+    MQTTClient_message *message = msg_queue.front();
+    std::shared_ptr<core::FlowFile> processFlowFile = session->create();
+    ConsumeMQTT::WriteCallback callback(message);
+    session->write(processFlowFile, &callback);
+    if (callback.status_ < 0) {
+      logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", 
processFlowFile->getUUIDStr());
+      session->remove(processFlowFile);
+    } else {
+      session->putAttribute(processFlowFile, MQTT_BROKER_ATTRIBUTE, 
uri_.c_str());
+      session->putAttribute(processFlowFile, MQTT_TOPIC_ATTRIBUTE, 
topic_.c_str());
+      logger_->log_debug("ConsumeMQTT processing success for the flow with 
UUID %s topic %s", processFlowFile->getUUIDStr(), topic_);
+      session->transfer(processFlowFile, Success);
+    }
+    MQTTClient_freeMessage(&message);
+    msg_queue.pop_front();
+  }
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/processors/ConsumeMQTT.h
----------------------------------------------------------------------
diff --git a/extensions/mqtt/processors/ConsumeMQTT.h 
b/extensions/mqtt/processors/ConsumeMQTT.h
new file mode 100644
index 0000000..0b26d42
--- /dev/null
+++ b/extensions/mqtt/processors/ConsumeMQTT.h
@@ -0,0 +1,125 @@
+/**
+ * @file ConsumeMQTT.h
+ * ConsumeMQTT class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __CONSUME_MQTT_H__
+#define __CONSUME_MQTT_H__
+
+#include <limits>
+#include <deque>
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/Property.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "concurrentqueue.h"
+#include "MQTTClient.h"
+#include "AbstractMQTTProcessor.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+#define MQTT_TOPIC_ATTRIBUTE "mqtt.topic"
+#define MQTT_BROKER_ATTRIBUTE "mqtt.broker"
+
+// ConsumeMQTT Class
+class ConsumeMQTT: public processors::AbstractMQTTProcessor {
+public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit ConsumeMQTT(std::string name, uuid_t uuid = NULL)
+    : processors::AbstractMQTTProcessor(name, uuid), 
logger_(logging::LoggerFactory<ConsumeMQTT>::getLogger()) {
+    isSubscriber_ = true;
+    maxQueueSize_ = 100;
+    maxSegSize_ = ULLONG_MAX;
+  }
+  // Destructor
+  virtual ~ConsumeMQTT() {
+    MQTTClient_message *message;
+    while (queue_.try_dequeue(message)) {
+      MQTTClient_freeMessage(&message);
+    }
+  }
+  // Processor Name
+  static constexpr char const* ProcessorName = "ConsumeMQTT";
+  // Supported Properties
+  static core::Property MaxFlowSegSize;
+  static core::Property QueueBufferMaxMessage;
+  // Nest Callback Class for write stream
+  class WriteCallback: public OutputStreamCallback {
+  public:
+    WriteCallback(MQTTClient_message *message) :
+      message_(message) {
+      status_ = 0;
+    }
+    MQTTClient_message *message_;
+    int64_t process(std::shared_ptr<io::BaseStream> stream) {
+      int64_t len = 
stream->write(reinterpret_cast<uint8_t*>(message_->payload), 
message_->payloadlen);
+      if (len < 0)
+        status_ = -1;
+      return len;
+    }
+    int status_;
+  };
+
+public:
+  /**
+   * Function that's executed when the processor is scheduled.
+   * @param context process context.
+   * @param sessionFactory process session factory that is used when creating
+   * ProcessSession objects.
+   */
+  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory 
*sessionFactory);
+  // OnTrigger method, implemented by NiFi ConsumeMQTT
+  virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, 
const std::shared_ptr<core::ProcessSession> &session);
+  // Initialize, over write by NiFi ConsumeMQTT
+  virtual void initialize(void);
+  virtual bool enqueueReceiveMQTTMsg(MQTTClient_message *message);
+
+protected:
+  void getReceivedMQTTMsg(std::deque<MQTTClient_message *> &msg_queue) {
+    MQTTClient_message *message;
+    while (queue_.try_dequeue(message)) {
+      msg_queue.push_back(message);
+    }
+  }
+
+private:
+  std::shared_ptr<logging::Logger> logger_;
+  std::mutex mutex_;
+  uint64_t maxQueueSize_;
+  uint64_t maxSegSize_;
+  moodycamel::ConcurrentQueue<MQTTClient_message *> queue_;
+};
+
+REGISTER_RESOURCE (ConsumeMQTT);
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/processors/ConvertBase.cpp
----------------------------------------------------------------------
diff --git a/extensions/mqtt/processors/ConvertBase.cpp 
b/extensions/mqtt/processors/ConvertBase.cpp
new file mode 100644
index 0000000..10571d4
--- /dev/null
+++ b/extensions/mqtt/processors/ConvertBase.cpp
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <stdio.h>
+#include <algorithm>
+#include <memory>
+#include <string>
+#include <map>
+#include <set>
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "ConvertBase.h"
+#include "PayloadSerializer.h"
+#include "utils/ByteArrayCallback.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property ConvertBase::MQTTControllerService("MQTT Controller Service", 
"Name of controller service that will be used for MQTT interactivity", "");
+core::Property ConvertBase::ListeningTopic("Listening Topic", "Name of topic 
to listen to", "");
+core::Relationship ConvertBase::Success("success", "All files are routed to 
success");
+
+void ConvertBase::initialize() {
+  // Set the supported properties
+  std::set<core::Property> properties;
+  properties.insert(MQTTControllerService);
+  properties.insert(ListeningTopic);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set<core::Relationship> relationships;
+  relationships.insert(Success);
+  setSupportedRelationships(relationships);
+}
+
+void ConvertBase::onSchedule(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+  std::string controller_service_name = "";
+  if (context->getProperty(MQTTControllerService.getName(), 
controller_service_name) && !controller_service_name.empty()) {
+    auto service = context->getControllerService(controller_service_name);
+    mqtt_service_ = 
std::static_pointer_cast<controllers::MQTTControllerService>(service);
+  }
+  context->getProperty(ListeningTopic.getName(), listening_topic);
+  if (!listening_topic.empty()) {
+    mqtt_service_->subscribeToTopic(listening_topic);
+  }
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/processors/ConvertBase.h
----------------------------------------------------------------------
diff --git a/extensions/mqtt/processors/ConvertBase.h 
b/extensions/mqtt/processors/ConvertBase.h
new file mode 100644
index 0000000..6f4a41e
--- /dev/null
+++ b/extensions/mqtt/processors/ConvertBase.h
@@ -0,0 +1,90 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_MQTT_PROTOCOL_CONVERTBASE_H_
+#define EXTENSIONS_MQTT_PROTOCOL_CONVERTBASE_H_
+
+#include "MQTTControllerService.h"
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/Property.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "MQTTClient.h"
+#include "c2/protocols/RESTProtocol.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+/**
+ * Purpose: Provides base functionality for mqtt conversion classes.
+ * At a minimum we need a controller service and listening topic.
+ */
+class ConvertBase : public core::Processor, public minifi::c2::RESTProtocol {
+ public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit ConvertBase(std::string name, uuid_t uuid = NULL)
+      : core::Processor(name, uuid) {
+  }
+  // Destructor
+  virtual ~ConvertBase() {
+  }
+  // Supported Properties
+  static core::Property MQTTControllerService;
+  static core::Property ListeningTopic;
+
+  static core::Relationship Success;
+
+ public:
+
+  /**
+   * Initialization of the processor
+   */
+  virtual void initialize() override;
+  /**
+   * Function that's executed when the processor is scheduled.
+   * @param context process context.
+   * @param sessionFactory process session factory that is used when creating
+   * ProcessSession objects.
+   */
+  virtual void onSchedule(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) 
override;
+ protected:
+
+  /**
+   * MQTT controller service.
+   */
+  std::shared_ptr<controllers::MQTTControllerService> mqtt_service_;
+
+  std::string listening_topic;
+
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* EXTENSIONS_MQTT_PROTOCOL_CONVERTBASE_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/processors/ConvertHeartBeat.cpp
----------------------------------------------------------------------
diff --git a/extensions/mqtt/processors/ConvertHeartBeat.cpp 
b/extensions/mqtt/processors/ConvertHeartBeat.cpp
new file mode 100644
index 0000000..cf5574f
--- /dev/null
+++ b/extensions/mqtt/processors/ConvertHeartBeat.cpp
@@ -0,0 +1,75 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <stdio.h>
+#include <algorithm>
+#include <memory>
+#include <string>
+#include <map>
+#include <set>
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "ConvertHeartBeat.h"
+#include "PayloadSerializer.h"
+#include "utils/ByteArrayCallback.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+void ConvertHeartBeat::onTrigger(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSession> &session) {
+  auto ff = session.get();
+  if (ff != nullptr){
+    logger_->log_error("ConvertHeartBeat does not receive flow files");
+    session->rollback();
+  }
+  if (nullptr == mqtt_service_) {
+    context->yield();
+    return;
+  }
+  std::vector<uint8_t> heartbeat;
+  bool received_heartbeat = false;
+  // while we have heartbeats we can continue to loop.
+  while (mqtt_service_->get(100, listening_topic, heartbeat)) {
+    if (heartbeat.size() > 0) {
+      c2::C2Payload payload = 
c2::mqtt::PayloadSerializer::deserialize(heartbeat);
+      auto serialized = serializeJsonRootPayload(payload);
+      logger_->log_debug("Converted JSON output %s", serialized);
+      minifi::utils::StreamOutputCallback byteCallback(serialized.size() + 1);
+      byteCallback.write(const_cast<char*>(serialized.c_str()), 
serialized.size());
+      auto newff = session->create();
+      session->write(newff, &byteCallback);
+      session->transfer(newff, Success);
+      received_heartbeat = true;
+    } else {
+      break;
+    }
+  }
+  if (!received_heartbeat) {
+    context->yield();
+  }
+
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/processors/ConvertHeartBeat.h
----------------------------------------------------------------------
diff --git a/extensions/mqtt/processors/ConvertHeartBeat.h 
b/extensions/mqtt/processors/ConvertHeartBeat.h
new file mode 100644
index 0000000..ad7f37a
--- /dev/null
+++ b/extensions/mqtt/processors/ConvertHeartBeat.h
@@ -0,0 +1,78 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __CONVERT_HEARTBEAT_H__
+#define __CONVERT_HEARTBEAT_H__
+
+#include "MQTTControllerService.h"
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/Property.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "MQTTClient.h"
+#include "c2/protocols/RESTProtocol.h"
+#include "ConvertBase.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+/*
+ * Purpose: ConvertHeartBeat converts heatbeats into MQTT messages.
+ */
+class ConvertHeartBeat: public ConvertBase{
+public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit ConvertHeartBeat(std::string name, uuid_t uuid = NULL)
+    : ConvertBase(name, uuid), 
logger_(logging::LoggerFactory<ConvertHeartBeat>::getLogger()) {
+  }
+  // Destructor
+  virtual ~ConvertHeartBeat() {
+  }
+  // Processor Name
+  static constexpr char const* ProcessorName = "ConvertHeartBeat";
+
+public:
+  /**
+   * Function that's executed when the processor is triggered.
+   * @param context process context.
+   * @param sessionFactory process session factory that is used when creating
+   * ProcessSession objects.
+   */
+
+  virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, 
const std::shared_ptr<core::ProcessSession> &session) override;
+
+private:
+  std::shared_ptr<logging::Logger> logger_;
+};
+
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/processors/ConvertJSONAck.cpp
----------------------------------------------------------------------
diff --git a/extensions/mqtt/processors/ConvertJSONAck.cpp 
b/extensions/mqtt/processors/ConvertJSONAck.cpp
new file mode 100644
index 0000000..6ea3eaa
--- /dev/null
+++ b/extensions/mqtt/processors/ConvertJSONAck.cpp
@@ -0,0 +1,115 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ConvertJSONAck.h"
+
+#include <stdio.h>
+#include <algorithm>
+#include <memory>
+#include <string>
+#include <map>
+#include <set>
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "PayloadSerializer.h"
+#include "utils/ByteArrayCallback.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+
+std::string ConvertJSONAck::parseTopicName(const std::string &json) {
+  std::string topic;
+  rapidjson::Document root;
+
+  try {
+    rapidjson::ParseResult ok = root.Parse(json.c_str());
+    if (ok) {
+      if (root.HasMember("agentInfo")) {
+        if (root["agentInfo"].HasMember("identifier")) {
+          std::stringstream topicStr;
+          topicStr << root["agentInfo"]["identifier"].GetString() << "/in";
+          return topicStr.str();
+        }
+      }
+    }
+  } catch (...) {
+
+  }
+  return topic;
+}
+void ConvertJSONAck::onTrigger(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSession> &session) {
+  if (nullptr == mqtt_service_) {
+    context->yield();
+    return;
+  }
+  auto flow = session->get();
+
+  if (!flow) {
+    return;
+  }
+
+  /**
+   * This processor expects a JSON response from InvokeHTTP and thus we expect 
a heartbeat ack following that.
+   * Since we are trailing InvokeHTTP
+   */
+  std::string topic;
+  {
+    // expect JSON response from InvokeHTTP and thus we expect a heartbeat and 
then the output from the HTTP
+    c2::C2Payload response_payload(c2::Operation::HEARTBEAT, 
state::UpdateState::READ_COMPLETE, true, true);
+    ReadCallback callback;
+    session->read(flow, &callback);
+
+    topic = parseTopicName(std::string(callback.buffer_.data(), 
callback.buffer_.size()));
+
+    session->transfer(flow, Success);
+
+  }
+  flow = session->get();
+
+  if (!flow) {
+    return;
+  }
+
+  if (!topic.empty()) {
+    ReadCallback callback;
+    session->read(flow, &callback);
+
+    c2::C2Payload response_payload(c2::Operation::HEARTBEAT, 
state::UpdateState::READ_COMPLETE, true, true);
+
+    std::string str(callback.buffer_.data(),callback.buffer_.size());
+    auto payload = parseJsonResponse(response_payload, callback.buffer_);
+
+    auto stream = c2::mqtt::PayloadSerializer::serialize(payload);
+
+    mqtt_service_->send(topic, stream->getBuffer(), stream->getSize());
+
+  }
+
+  session->transfer(flow, Success);
+
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/processors/ConvertJSONAck.h
----------------------------------------------------------------------
diff --git a/extensions/mqtt/processors/ConvertJSONAck.h 
b/extensions/mqtt/processors/ConvertJSONAck.h
new file mode 100644
index 0000000..e7331c8
--- /dev/null
+++ b/extensions/mqtt/processors/ConvertJSONAck.h
@@ -0,0 +1,105 @@
+/**
+ * @file ConvertAck.h
+ * ConvertAck class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __CONVERT_ACKNOWLEDGEMENT_H__
+#define __CONVERT_ACKNOWLEDGEMENT_H__
+
+#include "MQTTControllerService.h"
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/Property.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "MQTTClient.h"
+#include "c2/protocols/RESTProtocol.h"
+#include "ConvertBase.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+/**
+ * Purpose: Converts JSON acks into an MQTT consumable by
+ * MQTTC2Protocol.
+ */
+class ConvertJSONAck : public ConvertBase {
+ public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit ConvertJSONAck(std::string name, uuid_t uuid = NULL)
+      : ConvertBase(name, uuid),
+        logger_(logging::LoggerFactory<ConvertJSONAck>::getLogger()) {
+  }
+  // Destructor
+  virtual ~ConvertJSONAck() {
+  }
+  // Processor Name
+  static constexpr char const* ProcessorName = "ConvertJSONAck";
+
+
+ public:
+  /**
+   * Function that's executed when the processor is scheduled.
+   * @param context process context.
+   * @param sessionFactory process session factory that is used when creating
+   * ProcessSession objects.
+   */
+
+  virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, 
const std::shared_ptr<core::ProcessSession> &session) override;
+
+ protected:
+
+  class ReadCallback : public InputStreamCallback {
+   public:
+    ReadCallback() {
+    }
+    ~ReadCallback() {
+    }
+    int64_t process(std::shared_ptr<io::BaseStream> stream) {
+      int64_t ret = 0;
+      if (nullptr == stream)
+        return 0;
+      buffer_.resize(stream->getSize());
+      ret = stream->read(reinterpret_cast<uint8_t*>(buffer_.data()), 
stream->getSize());
+      return ret;
+    }
+    std::vector<char> buffer_;
+  };
+
+  /**
+   * Parse Topic name from the json -- given a known structure that we expect.
+   * @param json json representation defined by the restful protocol
+   */
+  std::string parseTopicName(const std::string &json);
+ private:
+  std::shared_ptr<logging::Logger> logger_;
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/processors/ConvertUpdate.cpp
----------------------------------------------------------------------
diff --git a/extensions/mqtt/processors/ConvertUpdate.cpp 
b/extensions/mqtt/processors/ConvertUpdate.cpp
new file mode 100644
index 0000000..264a4e0
--- /dev/null
+++ b/extensions/mqtt/processors/ConvertUpdate.cpp
@@ -0,0 +1,106 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ConvertUpdate.h"
+#include "utils/HTTPClient.h"
+#include "io/BaseStream.h"
+#include "io/DataStream.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property ConvertUpdate::SSLContext("SSL Context Service", "The SSL 
Context Service used to provide client certificate information for TLS/SSL 
(https) connections.", "");
+
+void ConvertUpdate::onTrigger(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSession> &session) {
+  if (nullptr == mqtt_service_) {
+    context->yield();
+    return;
+  }
+  std::vector<uint8_t> update;
+  bool received_update = false;
+  while (mqtt_service_->get(100, listening_topic, update)) {
+    // first we have the input topic string followed by the update URI
+    if (update.size() > 0) {
+
+      io::DataStream dataStream(update.data(), update.size());
+      io::BaseStream stream(&dataStream);
+
+      std::string returnTopic, url;
+
+      if (returnTopic.empty() || url.empty()) {
+        logger_->log_debug("topic and/or URL are empty");
+        break;
+      }
+
+      stream.readUTF(returnTopic);
+      stream.readUTF(url);
+
+      /**
+       * Not having curl support is actually okay for MQTT to be built, but 
running the update processor requires
+       * that we have curl available.
+       */
+      auto client_ptr = 
core::ClassLoader::getDefaultClassLoader().instantiateRaw("HTTPClient", 
"HTTPClient");
+      if (nullptr == client_ptr) {
+        logger_->log_error("Could not locate HTTPClient. You do not have cURL 
support!");
+        return;
+      }
+      std::unique_ptr<utils::BaseHTTPClient> client = 
std::unique_ptr<utils::BaseHTTPClient>(dynamic_cast<utils::BaseHTTPClient*>(client_ptr));
+      client->initialize("GET");
+      client->setConnectionTimeout(2000);
+      client->setReadTimeout(2000);
+
+      if (client->submit()) {
+        auto data = client->getResponseBody();
+        std::vector<uint8_t> raw_data;
+        std::transform(std::begin(data), std::end(data), 
std::back_inserter(raw_data), [](char c) {
+          return (uint8_t)c;
+        });
+        mqtt_service_->send(returnTopic, raw_data);
+      }
+
+      received_update = true;
+    } else {
+      break;
+    }
+  }
+
+  if (!received_update) {
+    context->yield();
+  }
+
+}
+
+void ConvertUpdate::initialize() {
+  // Set the supported properties
+  std::set<core::Property> properties;
+  properties.insert(MQTTControllerService);
+  properties.insert(ListeningTopic);
+  properties.insert(SSLContext);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set<core::Relationship> relationships;
+  relationships.insert(Success);
+  setSupportedRelationships(relationships);
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/processors/ConvertUpdate.h
----------------------------------------------------------------------
diff --git a/extensions/mqtt/processors/ConvertUpdate.h 
b/extensions/mqtt/processors/ConvertUpdate.h
new file mode 100644
index 0000000..a04529a
--- /dev/null
+++ b/extensions/mqtt/processors/ConvertUpdate.h
@@ -0,0 +1,91 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_MQTT_PROTOCOL_CONVERTUPDATE_H_
+#define EXTENSIONS_MQTT_PROTOCOL_CONVERTUPDATE_H_
+
+
+#include "MQTTControllerService.h"
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/Property.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "MQTTClient.h"
+#include "c2/protocols/RESTProtocol.h"
+#include "ConvertBase.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+/**
+ * Purpose: Converts update messages into the appropriate Restful call
+ *
+ * Justification: The other protocol classes are responsible for standard 
messaging, which carries most
+ * heartbeat related activity; however, updates generally connect to a 
different source. This processor will
+ * retrieve the updates and respond via MQTT.
+ */
+class ConvertUpdate : public ConvertBase {
+ public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit ConvertUpdate(std::string name, uuid_t uuid = NULL)
+    : ConvertBase(name, uuid), 
logger_(logging::LoggerFactory<ConvertUpdate>::getLogger()) {
+  }
+  // Destructor
+  virtual ~ConvertUpdate() {
+  }
+
+  static core::Property SSLContext;
+  // Processor Name
+  static constexpr char const* ProcessorName = "ConvertUpdate";
+
+public:
+
+  /**
+     * Initialization of the processor
+     */
+    virtual void initialize() override;
+  /**
+   * Function that's executed when the processor is triggered.
+   * @param context process context.
+   * @param sessionFactory process session factory that is used when creating
+   * ProcessSession objects.
+   */
+
+  virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, 
const std::shared_ptr<core::ProcessSession> &session) override;
+
+protected:
+  std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_;
+private:
+  std::shared_ptr<logging::Logger> logger_;
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* EXTENSIONS_MQTT_PROTOCOL_CONVERTUPDATE_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/processors/PublishMQTT.cpp
----------------------------------------------------------------------
diff --git a/extensions/mqtt/processors/PublishMQTT.cpp 
b/extensions/mqtt/processors/PublishMQTT.cpp
new file mode 100644
index 0000000..411cc2d
--- /dev/null
+++ b/extensions/mqtt/processors/PublishMQTT.cpp
@@ -0,0 +1,106 @@
+/**
+ * @file PublishMQTT.cpp
+ * PublishMQTT class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PublishMQTT.h"
+#include <stdio.h>
+#include <algorithm>
+#include <memory>
+#include <string>
+#include <map>
+#include <set>
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property PublishMQTT::Retain("Retain", "Retain MQTT published record in 
broker", "false");
+core::Property PublishMQTT::MaxFlowSegSize("Max Flow Segment Size", "Maximum 
flow content payload segment size for the MQTT record", "");
+
+void PublishMQTT::initialize() {
+  // Set the supported properties
+  std::set<core::Property> properties;
+  properties.insert(BrokerURL);
+  properties.insert(CleanSession);
+  properties.insert(ClientID);
+  properties.insert(UserName);
+  properties.insert(PassWord);
+  properties.insert(KeepLiveInterval);
+  properties.insert(ConnectionTimeOut);
+  properties.insert(QOS);
+  properties.insert(Topic);
+  properties.insert(Retain);
+  properties.insert(MaxFlowSegSize);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set<core::Relationship> relationships;
+  relationships.insert(Success);
+  relationships.insert(Failure);
+  setSupportedRelationships(relationships);
+}
+
+void PublishMQTT::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
+  AbstractMQTTProcessor::onSchedule(context, sessionFactory);
+  std::string value;
+  int64_t valInt;
+  value = "";
+  if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() 
&& core::Property::StringToInt(value, valInt)) {
+    max_seg_size_ = valInt;
+    logger_->log_debug("PublishMQTT: max flow segment size [%ll]", 
max_seg_size_);
+  }
+  value = "";
+  if (context->getProperty(Retain.getName(), value) && !value.empty() && 
org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, retain_)) {
+    logger_->log_debug("PublishMQTT: Retain [%d]", retain_);
+  }
+}
+
+void PublishMQTT::onTrigger(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSession> &session) {
+  std::shared_ptr<core::FlowFile> flowFile = session->get();
+
+  if (!flowFile) {
+    return;
+  }
+
+  if (!reconnect()) {
+    logger_->log_error("MQTT connect to %s failed", uri_);
+    session->transfer(flowFile, Failure);
+    return;
+  }
+
+  PublishMQTT::ReadCallback callback(flowFile->getSize(), max_seg_size_, 
topic_, client_, qos_, retain_, delivered_token_);
+  session->read(flowFile, &callback);
+  if (callback.status_ < 0) {
+    logger_->log_error("Failed to send flow to MQTT topic %s", topic_);
+    session->transfer(flowFile, Failure);
+  } else {
+    logger_->log_debug("Sent flow with length %d to MQTT topic %s", 
callback.read_size_, topic_);
+    session->transfer(flowFile, Success);
+  }
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/processors/PublishMQTT.h
----------------------------------------------------------------------
diff --git a/extensions/mqtt/processors/PublishMQTT.h 
b/extensions/mqtt/processors/PublishMQTT.h
new file mode 100644
index 0000000..67a0d7f
--- /dev/null
+++ b/extensions/mqtt/processors/PublishMQTT.h
@@ -0,0 +1,142 @@
+/**
+ * @file PublishMQTT.h
+ * PublishMQTT class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PUBLISH_MQTT_H__
+#define __PUBLISH_MQTT_H__
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/Property.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "MQTTClient.h"
+#include "AbstractMQTTProcessor.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// PublishMQTT Class
+class PublishMQTT: public processors::AbstractMQTTProcessor {
+public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit PublishMQTT(std::string name, uuid_t uuid = NULL)
+    : processors::AbstractMQTTProcessor(name, uuid), 
logger_(logging::LoggerFactory<PublishMQTT>::getLogger()) {
+    retain_ = false;
+    max_seg_size_ = ULLONG_MAX;
+  }
+  // Destructor
+  virtual ~PublishMQTT() {
+  }
+  // Processor Name
+  static constexpr char const* ProcessorName = "PublishMQTT";
+  // Supported Properties
+  static core::Property Retain;
+  static core::Property MaxFlowSegSize;
+
+  // Nest Callback Class for read stream
+  class ReadCallback: public InputStreamCallback {
+  public:
+    ReadCallback(uint64_t flow_size, uint64_t max_seg_size, const std::string 
&key, MQTTClient client,
+        int qos, bool retain, MQTTClient_deliveryToken &token) :
+        flow_size_(flow_size), max_seg_size_(max_seg_size), key_(key), 
client_(client),
+        qos_(qos), retain_(retain), token_(token) {
+      status_ = 0;
+      read_size_ = 0;
+    }
+    ~ReadCallback() {
+    }
+    int64_t process(std::shared_ptr<io::BaseStream> stream) {
+      if (flow_size_ < max_seg_size_)
+        max_seg_size_ = flow_size_;
+      std::vector<unsigned char> buffer;
+      buffer.reserve(max_seg_size_);
+      read_size_ = 0;
+      status_ = 0;
+      while (read_size_ < flow_size_) {
+        int readRet = stream->read(&buffer[0], max_seg_size_);
+        if (readRet < 0) {
+          status_ = -1;
+          return read_size_;
+        }
+        if (readRet > 0) {
+          MQTTClient_message pubmsg = MQTTClient_message_initializer;
+          pubmsg.payload = &buffer[0];
+          pubmsg.payloadlen = readRet;
+          pubmsg.qos = qos_;
+          pubmsg.retained = retain_;
+          if (MQTTClient_publishMessage(client_, key_.c_str(), &pubmsg, 
&token_) != MQTTCLIENT_SUCCESS) {
+            status_ = -1;
+            return -1;
+          }
+          read_size_ += readRet;
+        } else {
+          break;
+        }
+      }
+      return read_size_;
+    }
+    uint64_t flow_size_;
+    uint64_t max_seg_size_;
+    std::string key_;
+    MQTTClient client_;;
+    int status_;
+    size_t read_size_;
+    int qos_;
+    int retain_;
+    MQTTClient_deliveryToken &token_;
+  };
+
+public:
+  /**
+   * Function that's executed when the processor is scheduled.
+   * @param context process context.
+   * @param sessionFactory process session factory that is used when creating
+   * ProcessSession objects.
+   */
+  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory 
*sessionFactory);
+  // OnTrigger method, implemented by NiFi PublishMQTT
+  virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, 
const std::shared_ptr<core::ProcessSession> &session);
+  // Initialize, over write by NiFi PublishMQTT
+  virtual void initialize(void);
+
+protected:
+
+private:
+  uint64_t max_seg_size_;
+  bool retain_;
+  std::shared_ptr<logging::Logger> logger_;
+};
+
+REGISTER_RESOURCE (PublishMQTT);
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/protocol/MQTTC2Protocol.cpp
----------------------------------------------------------------------
diff --git a/extensions/mqtt/protocol/MQTTC2Protocol.cpp 
b/extensions/mqtt/protocol/MQTTC2Protocol.cpp
new file mode 100644
index 0000000..03a6c8d
--- /dev/null
+++ b/extensions/mqtt/protocol/MQTTC2Protocol.cpp
@@ -0,0 +1,103 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MQTTC2Protocol.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+MQTTC2Protocol::MQTTC2Protocol(std::string name, uuid_t uuid)
+    : C2Protocol(name, uuid),
+      logger_(logging::LoggerFactory<Connectable>::getLogger()) {
+}
+
+MQTTC2Protocol::~MQTTC2Protocol() {
+}
+
+void MQTTC2Protocol::initialize(const 
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const 
std::shared_ptr<Configure> &configure) {
+  if (configure->get("nifi.c2.mqtt.connector.service", 
controller_service_name_)) {
+    auto service = controller->getControllerService(controller_service_name_);
+    mqtt_service_ = 
std::static_pointer_cast<controllers::MQTTControllerService>(service);
+  } else
+    mqtt_service_ = nullptr;
+
+  agent_identifier_ = configure->getAgentIdentifier();
+
+  std::stringstream outputStream;
+  std::string updateTopicOpt, heartbeatTopicOpt;
+  if (configure->get("nifi.c2.mqtt.heartbeat.topic", heartbeatTopicOpt)) {
+    heartbeat_topic_ = heartbeatTopicOpt;
+  } else {
+    heartbeat_topic_ = "heartbeats";  // outputStream.str();
+  }
+  if (configure->get("nifi.c2.mqtt.update.topic", updateTopicOpt)) {
+    update_topic_ = updateTopicOpt;
+  } else {
+    update_topic_ = "updates";
+  }
+
+  std::stringstream inputStream;
+  inputStream << agent_identifier_ << "/in";
+  in_topic_ = inputStream.str();
+
+  if (mqtt_service_) {
+    mqtt_service_->subscribeToTopic(in_topic_);
+  }
+}
+
+C2Payload MQTTC2Protocol::consumePayload(const std::string &url, const 
C2Payload &payload, Direction direction, bool async) {
+  // we are getting an update.
+  std::lock_guard<std::mutex> lock(input_mutex_);
+  io::BaseStream stream;
+  stream.writeUTF(in_topic_);
+  stream.writeUTF(url);
+  std::vector<uint8_t> response;
+  auto transmit_id = mqtt_service_->send(update_topic_, stream.getBuffer(), 
stream.getSize());
+  if (transmit_id > 0 && mqtt_service_->awaitResponse(5000, transmit_id, 
in_topic_, response)) {
+    C2Payload response_payload(payload.getOperation(), 
state::UpdateState::READ_COMPLETE, true, true);
+    response_payload.setRawData(response);
+    return response_payload;
+  } else {
+    return C2Payload(payload.getOperation(), 
state::UpdateState::READ_COMPLETE, true);
+  }
+}
+
+C2Payload MQTTC2Protocol::serialize(const C2Payload &payload) {
+  if (mqtt_service_ == nullptr || !mqtt_service_->isRunning()) {
+    return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, 
true);
+  }
+
+  std::lock_guard<std::mutex> lock(input_mutex_);
+
+  auto stream = c2::mqtt::PayloadSerializer::serialize(payload);
+
+  auto transmit_id = mqtt_service_->send(heartbeat_topic_, 
stream->getBuffer(), stream->getSize());
+  std::vector<uint8_t> response;
+  if (transmit_id > 0 && mqtt_service_->awaitResponse(5000, transmit_id, 
in_topic_, response)) {
+    return  c2::mqtt::PayloadSerializer::deserialize(response);
+  }
+  return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, 
true);
+}
+
+} /* namespace c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/protocol/MQTTC2Protocol.h
----------------------------------------------------------------------
diff --git a/extensions/mqtt/protocol/MQTTC2Protocol.h 
b/extensions/mqtt/protocol/MQTTC2Protocol.h
new file mode 100644
index 0000000..460e5d4
--- /dev/null
+++ b/extensions/mqtt/protocol/MQTTC2Protocol.h
@@ -0,0 +1,96 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_MQTT_PROTOCOL_MQTTC2PROTOCOL_H_
+#define EXTENSIONS_MQTT_PROTOCOL_MQTTC2PROTOCOL_H_
+
+#include <algorithm>
+#include <iostream>
+#include <memory>
+#include <utility>
+#include <map>
+#include <string>
+#include <vector>
+
+#include "../controllerservice/MQTTControllerService.h"
+#include "c2/C2Protocol.h"
+#include "io/BaseStream.h"
+#include "agent/agent_version.h"
+#include "PayloadSerializer.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+/**
+ * Purpose: Implementation of the MQTT C2 protocol. Serializes messages to and 
from
+ * and mqtt server.
+ */
+class MQTTC2Protocol : public C2Protocol {
+ public:
+  explicit MQTTC2Protocol(std::string name, uuid_t uuid = nullptr);
+
+  virtual ~MQTTC2Protocol();
+
+  /**
+   * Consume the payload.
+   * @param url to evaluate.
+   * @param payload payload to consume.
+   * @direction direction of operation.
+   */
+  virtual C2Payload consumePayload(const std::string &url, const C2Payload 
&payload, Direction direction, bool async) override;
+
+  virtual C2Payload consumePayload(const C2Payload &payload, Direction 
direction, bool async) override {
+    return serialize(payload);
+  }
+
+  virtual void update(const std::shared_ptr<Configure> &configure) override {
+    // no op.
+  }
+
+  virtual void initialize(const 
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const 
std::shared_ptr<Configure> &configure) override;
+
+ protected:
+
+  C2Payload serialize(const C2Payload &payload);
+
+  std::mutex input_mutex_;
+  // input topic on which we will listen.
+  std::string in_topic_;
+  // agent identifier
+  std::string agent_identifier_;
+  // heartbeat topic name.
+  std::string heartbeat_topic_;
+  // update topic name.
+  std::string update_topic_;
+
+  // mqtt controller service reference.
+  std::shared_ptr<controllers::MQTTControllerService> mqtt_service_;
+  std::shared_ptr<logging::Logger> logger_;
+  //mqtt controller serviec name.
+  std::string controller_service_name_;
+
+
+};
+} /* namespace c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif /* EXTENSIONS_MQTT_PROTOCOL_MQTTC2PROTOCOL_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/protocol/PayloadSerializer.cpp
----------------------------------------------------------------------
diff --git a/extensions/mqtt/protocol/PayloadSerializer.cpp 
b/extensions/mqtt/protocol/PayloadSerializer.cpp
new file mode 100644
index 0000000..5f62227
--- /dev/null
+++ b/extensions/mqtt/protocol/PayloadSerializer.cpp
@@ -0,0 +1,38 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PayloadSerializer.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+namespace mqtt {
+
+PayloadSerializer::PayloadSerializer() {
+}
+
+PayloadSerializer::~PayloadSerializer() {
+}
+
+} /* namespace mqtt */
+} /* namespace c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/protocol/PayloadSerializer.h
----------------------------------------------------------------------
diff --git a/extensions/mqtt/protocol/PayloadSerializer.h 
b/extensions/mqtt/protocol/PayloadSerializer.h
new file mode 100644
index 0000000..27b3362
--- /dev/null
+++ b/extensions/mqtt/protocol/PayloadSerializer.h
@@ -0,0 +1,318 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef EXTENSIONS_MQTT_PROTOCOL_PAYLOADSERIALIZER_H_
+#define EXTENSIONS_MQTT_PROTOCOL_PAYLOADSERIALIZER_H_
+
+#include "c2/C2Protocol.h"
+#include "io/BaseStream.h"
+#include "core/state/Value.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+namespace mqtt {
+
+class PayloadSerializer {
+ public:
+
+  /**
+   * Static function that serializes the value nodes
+   */
+  static void serializeValueNode(state::response::ValueNode &value, 
std::shared_ptr<io::BaseStream> stream) {
+    auto base_type = value.getValue();
+    uint8_t type = 0x00;
+    if (auto sub_type = 
std::dynamic_pointer_cast<state::response::IntValue>(base_type)) {
+      type = 1;
+      stream->write(&type, 1);
+      uint32_t value = sub_type->getValue();
+      stream->write(value);
+    } else if (auto sub_type = 
std::dynamic_pointer_cast<state::response::Int64Value>(base_type)) {
+      type = 2;
+      stream->write(&type, 1);
+      uint64_t value = sub_type->getValue();
+      stream->write(value);
+    } else if (auto sub_type = 
std::dynamic_pointer_cast<state::response::BoolValue>(base_type)) {
+      type = 3;
+      stream->write(&type, 1);
+      if (sub_type->getValue()) {
+        type = 1;
+
+      } else
+        type = 0;
+      stream->write(&type, 1);
+    } else {
+      auto str = base_type->getStringValue();
+      type = 4;
+      stream->write(&type, 1);
+      stream->writeUTF(str);
+    }
+  }
+  static void serialize(uint8_t op, const C2Payload &payload, 
std::shared_ptr<io::BaseStream> stream) {
+    uint8_t st;
+    uint32_t size = payload.getNestedPayloads().size();
+    stream->write(size);
+    for (auto nested_payload : payload.getNestedPayloads()) {
+      op = opToInt(nested_payload.getOperation());
+      stream->write(&op, 1);
+      stream->write(&st, 1);
+      stream->writeUTF(nested_payload.getLabel());
+      stream->writeUTF(nested_payload.getIdentifier());
+      const std::vector<C2ContentResponse> &content = 
nested_payload.getContent();
+      size = content.size();
+      stream->write(size);
+      for (const auto &payload_content : content) {
+        stream->writeUTF(payload_content.name);
+        size = payload_content.operation_arguments.size();
+        stream->write(size);
+        for (auto content : payload_content.operation_arguments) {
+          stream->writeUTF(content.first);
+          serializeValueNode(content.second, stream);
+        }
+      }
+      if (nested_payload.getNestedPayloads().size() > 0) {
+        serialize(op, nested_payload, stream);
+      } else {
+        size = 0;
+        stream->write(size);
+      }
+    }
+  }
+
+  static uint8_t opToInt(const Operation opt) {
+    uint8_t op;
+
+    switch (opt) {
+      case Operation::ACKNOWLEDGE:
+        op = 1;
+        break;
+      case Operation::HEARTBEAT:
+        op = 2;
+        break;
+      case Operation::RESTART:
+        op = 3;
+        break;
+      case Operation::DESCRIBE:
+        op = 4;
+        break;
+      case Operation::STOP:
+        op = 5;
+        break;
+      case Operation::START:
+        op = 6;
+        break;
+      case Operation::UPDATE:
+        op = 7;
+        break;
+      default:
+        op = 2;
+        break;
+    }
+    return op;
+  }
+  static std::shared_ptr<io::BaseStream> serialize(const C2Payload &payload) {
+    std::shared_ptr<io::BaseStream> stream = 
std::make_shared<io::BaseStream>();
+    uint8_t op, st = 0;
+    op = opToInt(payload.getOperation());
+    stream->write(&op, 1);
+    if (payload.getStatus().getState() == state::UpdateState::NESTED) {
+      st = 1;
+      stream->write(&st, 1);
+    } else {
+      st = 0;
+      stream->write(&st, 1);
+    }
+    stream->writeUTF(payload.getLabel());
+
+    stream->writeUTF(payload.getIdentifier());
+    const std::vector<C2ContentResponse> &content = payload.getContent();
+    uint32_t size = content.size();
+    stream->write(size);
+    for (const auto &payload_content : content) {
+      stream->writeUTF(payload_content.name);
+      size = payload_content.operation_arguments.size();
+      stream->write(size);
+      for (auto content : payload_content.operation_arguments) {
+        stream->writeUTF(content.first);
+        serializeValueNode(content.second, stream);
+      }
+    }
+    serialize(op, payload, stream);
+    return stream;
+  }
+
+  static state::response::ValueNode deserializeValueNode(io::BaseStream 
*stream) {
+    uint8_t type = 0;
+    stream->read(&type, 1);
+    state::response::ValueNode node;
+    switch (type) {
+      case 1:
+        uint32_t thb;
+        stream->read(thb);
+        node = thb;
+        break;
+      case 2:
+        uint64_t base;
+        stream->read(base);
+        node = base;
+        break;
+      case 3:
+        stream->read(&type, 1);
+        if (type == 1)
+          node = true;
+        else
+          node = false;
+        break;
+      default:
+      case 4:
+        std::string str;
+        stream->readUTF(str);
+        node = str;
+    }
+    return node;
+  }
+  static C2Payload deserialize(std::vector<uint8_t> data) {
+    C2Payload payload(Operation::HEARTBEAT, state::UpdateState::READ_COMPLETE, 
true);
+    if (deserialize(data, payload)) {
+      return payload;
+    }
+    return C2Payload(Operation::HEARTBEAT, state::UpdateState::READ_ERROR, 
true);
+  }
+  /**
+   * Deserializes the payloads
+   * @param parent payload to deserialize.
+   * @param operation of parent payload
+   * @param identifier for this payload
+   * @param stream base stream in which we will serialize the parent payload.
+   */
+  static bool deserializePayload(C2Payload &parent, Operation operation, 
std::string identifier, io::BaseStream *stream) {
+    uint32_t payloads = 0;
+    stream->read(payloads);
+    uint8_t op, st;
+    std::string label;
+    for (size_t i = 0; i < payloads; i++) {
+      stream->read(op);
+      stream->read(st);
+      stream->readUTF(label);
+      stream->readUTF(identifier);
+      operation = intToOp(op);
+      C2Payload subPayload(operation, st == 1 ? state::UpdateState::NESTED : 
state::UpdateState::READ_COMPLETE);
+      subPayload.setIdentifier(identifier);
+      subPayload.setLabel(label);
+      uint32_t content_size = 0;
+      stream->read(content_size);
+      for (uint32_t i = 0; i < content_size; i++) {
+        std::string content_name;
+        uint32_t args = 0;
+        C2ContentResponse content(operation);
+        stream->readUTF(content_name);
+        content.name = content_name;
+        stream->read(args);
+        for (uint32_t j = 0; j < args; j++) {
+          std::string first, second;
+          stream->readUTF(first);
+          content.operation_arguments[first] = deserializeValueNode(stream);
+        }
+        subPayload.addContent(std::move(content));
+      }
+      deserializePayload(subPayload, operation, identifier, stream);
+      parent.addPayload(std::move(subPayload));
+
+    }
+    return true;
+  }
+  static bool deserialize(std::vector<uint8_t> data, C2Payload &payload) {
+    io::DataStream dataStream(data.data(), data.size());
+    io::BaseStream stream(&dataStream);
+
+    uint8_t op, st = 0;
+    ;
+
+    std::string identifier, label;
+    // read op
+    stream.read(op);
+    stream.read(st);
+    stream.readUTF(label);
+    stream.readUTF(identifier);
+
+    Operation operation = intToOp(op);
+
+    C2Payload newPayload(operation, st == 1 ? state::UpdateState::NESTED : 
state::UpdateState::READ_COMPLETE);
+    newPayload.setIdentifier(identifier);
+    newPayload.setLabel(label);
+
+    uint32_t content_size = 0;
+    stream.read(content_size);
+    for (size_t i = 0; i < content_size; i++) {
+      std::string content_name;
+      uint32_t args = 0;
+      C2ContentResponse content(operation);
+      stream.readUTF(content_name);
+      content.name = content_name;
+      stream.read(args);
+      for (uint32_t j = 0; j < args; j++) {
+        std::string first, second;
+        stream.readUTF(first);
+        //stream.readUTF(second);
+        content.operation_arguments[first] = deserializeValueNode(&stream);
+      }
+      newPayload.addContent(std::move(content));
+    }
+
+    deserializePayload(newPayload, operation, identifier, &stream);
+    // we're finished
+    payload = std::move(newPayload);
+    return true;
+  }
+ private:
+
+  static Operation intToOp(int op) {
+    switch (op) {
+      case 1:
+        return Operation::ACKNOWLEDGE;
+      case 2:
+        return Operation::HEARTBEAT;
+      case 3:
+        return Operation::RESTART;
+      case 4:
+        return Operation::DESCRIBE;
+      case 5:
+        return Operation::STOP;
+      case 6:
+        return Operation::START;
+      case 7:
+        return Operation::UPDATE;
+      default:
+        return Operation::HEARTBEAT;
+        ;
+    }
+
+  }
+  PayloadSerializer();
+  virtual ~PayloadSerializer();
+};
+
+} /* namespace mqtt */
+} /* namespace c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* EXTENSIONS_MQTT_PROTOCOL_PAYLOADSERIALIZER_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/rocksdb-repos/DatabaseContentRepository.h
----------------------------------------------------------------------
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h 
b/extensions/rocksdb-repos/DatabaseContentRepository.h
index d469569..49e7227 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.h
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.h
@@ -41,7 +41,9 @@ class StringAppender : public 
rocksdb::AssociativeMergeOperator {
 
   virtual bool Merge(const rocksdb::Slice& key, const rocksdb::Slice* 
existing_value, const rocksdb::Slice& value, std::string* new_value, 
rocksdb::Logger* logger) const {
     // Clear the *new_value for writing.
-    assert(new_value);
+    if (nullptr == new_value) {
+      return false;
+    }
     new_value->clear();
 
     if (!existing_value) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/libminifi/include/c2/C2Payload.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/C2Payload.h b/libminifi/include/c2/C2Payload.h
index 4d4b0ad..cbe2cb1 100644
--- a/libminifi/include/c2/C2Payload.h
+++ b/libminifi/include/c2/C2Payload.h
@@ -165,6 +165,11 @@ class C2Payload : public state::Update {
   void setRawData(const std::vector<char> &data);
 
   /**
+   * Sets raw data from a vector of uint8_t within this object.
+   */
+  void setRawData(const std::vector<uint8_t> &data);
+
+  /**
    * Returns raw data.
    */
   std::vector<char> getRawData() const;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/libminifi/include/core/controller/ControllerService.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/controller/ControllerService.h 
b/libminifi/include/core/controller/ControllerService.h
index 8f52772..b754aa9 100644
--- a/libminifi/include/core/controller/ControllerService.h
+++ b/libminifi/include/core/controller/ControllerService.h
@@ -125,8 +125,13 @@ class ControllerService : public ConfigurableComponent, 
public Connectable {
     return false;
   }
 
+  void setLinkedControllerServices( const 
std::vector<std::shared_ptr<controller::ControllerService> > &services ){
+    linked_services_ = services;
+  }
+
  protected:
 
+  std::vector<std::shared_ptr<controller::ControllerService> > 
linked_services_;
   std::shared_ptr<Configure> configuration_;
   std::atomic<ControllerServiceState> current_state_;
   virtual bool canEdit() {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/libminifi/include/core/controller/StandardControllerServiceProvider.h
----------------------------------------------------------------------
diff --git 
a/libminifi/include/core/controller/StandardControllerServiceProvider.h 
b/libminifi/include/core/controller/StandardControllerServiceProvider.h
index 4d70757..678f3f5 100644
--- a/libminifi/include/core/controller/StandardControllerServiceProvider.h
+++ b/libminifi/include/core/controller/StandardControllerServiceProvider.h
@@ -110,7 +110,7 @@ class StandardControllerServiceProvider : public 
ControllerServiceProvider, publ
   }
 
   virtual void enableAllControllerServices() {
-    logger_->log_info("Enabling %ll controller services", 
controller_map_->getAllControllerServices().size());
+    logger_->log_info("Enabling %u controller services", 
controller_map_->getAllControllerServices().size());
     for (auto service : controller_map_->getAllControllerServices()) {
 
       if (service->canEnable()) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/libminifi/include/properties/Configure.h
----------------------------------------------------------------------
diff --git a/libminifi/include/properties/Configure.h 
b/libminifi/include/properties/Configure.h
index 66a9351..3646967 100644
--- a/libminifi/include/properties/Configure.h
+++ b/libminifi/include/properties/Configure.h
@@ -20,6 +20,7 @@
 #ifndef __CONFIGURE_H__
 #define __CONFIGURE_H__
 
+#include <mutex>
 #include "properties/Properties.h"
 
 namespace org {
@@ -29,6 +30,15 @@ namespace minifi {
 
 class Configure : public Properties {
  public:
+
+  void setAgentIdentifier(const std::string &identifier) {
+    std::lock_guard<std::mutex> lock(mutex_);
+    agent_identifier_ = identifier;
+  }
+  std::string getAgentIdentifier() {
+    std::lock_guard<std::mutex> lock(mutex_);
+    return agent_identifier_;
+  }
   // nifi.flow.configuration.file
   static const char *nifi_default_directory;
   static const char *nifi_c2_enable;
@@ -70,6 +80,10 @@ class Configure : public Properties {
   // nifi rest api user name and password
   static const char *nifi_rest_api_user_name;
   static const char *nifi_rest_api_password;
+
+ private:
+  std::string agent_identifier_;
+  std::mutex mutex_;
 };
 
 } /* namespace minifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/libminifi/include/utils/ByteArrayCallback.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/ByteArrayCallback.h 
b/libminifi/include/utils/ByteArrayCallback.h
index 653159c..49249a7 100644
--- a/libminifi/include/utils/ByteArrayCallback.h
+++ b/libminifi/include/utils/ByteArrayCallback.h
@@ -92,9 +92,9 @@ class ByteOutputCallback : public OutputStreamCallback {
  public:
   ByteOutputCallback() = delete;
 
-  explicit ByteOutputCallback(size_t max_size, bool wait_on_read=false)
+  explicit ByteOutputCallback(size_t max_size, bool wait_on_read = false)
       : max_size_(max_size),
-        read_started_( wait_on_read ? false : true ),
+        read_started_(wait_on_read ? false : true),
         logger_(logging::LoggerFactory<ByteOutputCallback>::getLogger()) {
     current_str_pos = 0;
     size_ = 0;
@@ -149,6 +149,18 @@ class ByteOutputCallback : public OutputStreamCallback {
 
 };
 
+class StreamOutputCallback : public ByteOutputCallback {
+ public:
+  explicit StreamOutputCallback(size_t max_size, bool wait_on_read = false)
+      : ByteOutputCallback(max_size, wait_on_read) {
+
+  }
+
+  virtual void write(char *data, size_t size);
+
+  virtual int64_t process(std::shared_ptr<io::BaseStream> stream);
+};
+
 } /* namespace utils */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 1414765..688c276 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -360,6 +360,13 @@ void FlowController::initializeC2() {
   } else {
     c2_enabled_ = true;
   }
+
+  std::string identifier_str;
+  if (!configuration_->get("nifi.c2.agent.identifier", identifier_str) || 
identifier_str.empty()) {
+    // set to the flow controller's identifier
+    identifier_str = uuidStr_;
+  }
+  configuration_->setAgentIdentifier(identifier_str);
   state::StateManager::initialize();
 
   std::shared_ptr<c2::C2Agent> agent = 
std::make_shared<c2::C2Agent>(std::dynamic_pointer_cast<FlowController>(shared_from_this()),
 std::dynamic_pointer_cast<FlowController>(shared_from_this()),
@@ -409,13 +416,8 @@ void FlowController::initializeC2() {
       auto identifier = 
std::dynamic_pointer_cast<state::response::AgentIdentifier>(processor);
 
       if (identifier != nullptr) {
-        std::string identifier_str;
-        if (configuration_->get("nifi.c2.agent.identifier", identifier_str) && 
!identifier_str.empty()) {
-          identifier->setIdentifier(identifier_str);
-        } else {
-          // set to the flow controller's identifier
-          identifier->setIdentifier(uuidStr_);
-        }
+
+        identifier->setIdentifier(identifier_str);
 
         std::string class_str;
         if (configuration_->get("nifi.c2.agent.class", class_str) && 
!class_str.empty()) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/libminifi/src/c2/C2Agent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index 077aefe..b6d2825 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -108,6 +108,7 @@ void C2Agent::configure(const std::shared_ptr<Configure> 
&configure, bool reconf
     auto protocol = 
core::ClassLoader::getDefaultClassLoader().instantiateRaw(clazz, clazz);
 
     if (protocol == nullptr) {
+      logger_->log_info("Class %s not found", clazz);
       protocol = 
core::ClassLoader::getDefaultClassLoader().instantiateRaw("RESTSender", 
"RESTSender");
 
       if (!protocol) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/libminifi/src/c2/C2Payload.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/c2/C2Payload.cpp b/libminifi/src/c2/C2Payload.cpp
index 3807b12..cec2a7e 100644
--- a/libminifi/src/c2/C2Payload.cpp
+++ b/libminifi/src/c2/C2Payload.cpp
@@ -178,6 +178,13 @@ void C2Payload::setRawData(const std::vector<char> &data) {
   raw_data_.insert(std::end(raw_data_), std::begin(data), std::end(data));
 }
 
+void C2Payload::setRawData(const std::vector<uint8_t> &data) {
+  std::transform(std::begin(data), std::end(data), 
std::back_inserter(raw_data_),[](uint8_t c){
+    return (char)c;
+  });
+}
+
+
 std::vector<char> C2Payload::getRawData() const {
   return raw_data_;
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/libminifi/src/core/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessSession.cpp 
b/libminifi/src/core/ProcessSession.cpp
index fde2bc6..9b732dc 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -830,7 +830,7 @@ std::shared_ptr<core::FlowFile> ProcessSession::get() {
   std::shared_ptr<Connectable> first = 
process_context_->getProcessorNode()->getNextIncomingConnection();
 
   if (first == NULL) {
-    logger_->log_debug("Get is null for %s", 
process_context_->getProcessorNode()->getName());
+    logger_->log_trace("Get is null for %s", 
process_context_->getProcessorNode()->getName());
     return NULL;
   }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/libminifi/src/core/controller/StandardControllerServiceNode.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/controller/StandardControllerServiceNode.cpp 
b/libminifi/src/core/controller/StandardControllerServiceNode.cpp
index 5c4aa70..ae043da 100644
--- a/libminifi/src/core/controller/StandardControllerServiceNode.cpp
+++ b/libminifi/src/core/controller/StandardControllerServiceNode.cpp
@@ -53,6 +53,12 @@ bool StandardControllerServiceNode::enable() {
   }
   std::shared_ptr<ControllerService> impl = 
getControllerServiceImplementation();
   if (nullptr != impl) {
+    std::lock_guard<std::mutex> lock(mutex_);
+    std::vector<std::shared_ptr<ControllerService> > services;
+    for(auto service : linked_controller_services_ ){
+      services.push_back ( service->getControllerServiceImplementation());
+    }
+    impl->setLinkedControllerServices( services );
     impl->onEnable();
   }
   return true;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/libminifi/src/utils/ByteArrayCallback.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/utils/ByteArrayCallback.cpp 
b/libminifi/src/utils/ByteArrayCallback.cpp
index ed9ffb7..b0c0dbc 100644
--- a/libminifi/src/utils/ByteArrayCallback.cpp
+++ b/libminifi/src/utils/ByteArrayCallback.cpp
@@ -37,6 +37,21 @@ int64_t 
ByteOutputCallback::process(std::shared_ptr<io::BaseStream> stream) {
   return size_.load();
 }
 
+int64_t StreamOutputCallback::process(std::shared_ptr<io::BaseStream> stream) {
+  stream->seek(0);
+  std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[size_.load()]);
+  auto written = readFully(buffer.get(), size_);
+  stream->writeData(reinterpret_cast<uint8_t*>(buffer.get()), written);
+  return stream->getSize();
+
+}
+
+void StreamOutputCallback::write(char *data, size_t size) {
+  if (!is_alive_)
+    return;
+  write_and_notify(data, size);
+}
+
 const std::vector<char> ByteOutputCallback::to_string() {
   std::vector<char> buffer;
   buffer.resize(size_.load());

Reply via email to