Github user apiri commented on a diff in the pull request:
https://github.com/apache/nifi-minifi-cpp/pull/237#discussion_r161234180
--- Diff: libminifi/src/c2/ControllerSocketProtocol.cpp ---
@@ -0,0 +1,245 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "c2/ControllerSocketProtocol.h"
+#include "utils/StringUtils.h"
+#include "io/DescriptorStream.h"
+#include <utility>
+#include <memory>
+#include <vector>
+#include <string>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+void ControllerSocketProtocol::initialize(const
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const
std::shared_ptr<state::StateMonitor> &updateSink,
+ const std::shared_ptr<Configure>
&configuration) {
+ HeartBeatReporter::initialize(controller, updateSink, configuration);
+ stream_factory_ =
std::make_shared<minifi::io::StreamFactory>(configuration);
+
+ std::string host, port, caCert;
+ if (nullptr != configuration_ &&
configuration_->get("controller.socket.host", host) &&
configuration_->get("controller.socket.port", port)) {
+ server_socket_ = std::unique_ptr<io::ServerSocket>(new
io::ServerSocket(nullptr, host, std::stoi(port), 2));
+ server_socket_->initialize(true);
+
+ auto check = [this]() -> bool {
+ return update_sink_->isRunning();
+ };
+
+ auto handler = [this](int fd) {
+ uint8_t head;
+ io::DescriptorStream stream(fd);
+ if (stream.read(head) != 1) {
+ logger_->log_debug("Connection broke with fd %d", fd);
+ return;
+ }
+ switch (head) {
+ case Operation::START:
+ {
+ std::string componentStr;
+ int size = stream.readUTF(componentStr);
+ if ( size != -1 ) {
+ auto components = update_sink_->getComponents(componentStr);
+ for (auto component : components) {
+ component->start();
+ }
+ } else {
+ logger_->log_debug("Connection broke with fd %d", fd);
+ }
+ }
+ break;
+ case Operation::STOP:
+ {
+ std::string componentStr;
+ int size = stream.readUTF(componentStr);
+ if ( size != -1 ) {
+ auto components = update_sink_->getComponents(componentStr);
+ for (auto component : components) {
+ component->stop(true, 1000);
+ }
+ } else {
+ logger_->log_debug("Connection broke with fd %d", fd);
+ }
+ }
+ break;
+ case Operation::CLEAR:
+ {
+ std::string connection;
+ int size = stream.readUTF(connection);
+ if ( size != -1 ) {
+ update_sink_->clearConnection(connection);
+ }
+ }
+ break;
+ case Operation::UPDATE:
+ {
+ std::string what;
+ int size = stream.readUTF(what);
+ if (size == -1) {
+ logger_->log_debug("Connection broke with fd %d", fd);
+ break;
+ }
+ if (what == "flow") {
+ std::string ff_loc;
+ int size = stream.readUTF(ff_loc);
+ std::ifstream tf(ff_loc);
+ std::string configuration((std::istreambuf_iterator<char>(tf)),
+ std::istreambuf_iterator<char>());
+ if (size == -1) {
+ logger_->log_debug("Connection broke with fd %d", fd);
+ break;
+ }
+ update_sink_->applyUpdate(configuration);
+ }
+ }
+ break;
+ case Operation::DESCRIBE:
+ {
+ std::string what;
+ int size = stream.readUTF(what);
+ if (size == -1) {
+ logger_->log_debug("Connection broke with fd %d", fd);
+ break;
+ }
+ if (what == "queue") {
+ std::string connection;
+ int size = stream.readUTF(connection);
+ if (size == -1) {
+ logger_->log_debug("Connection broke with fd %d", fd);
+ break;
+ }
+ std::stringstream response;
+ {
+ std::lock_guard<std::mutex> lock(controller_mutex_);
+ response << queue_size_[connection] << " / " <<
queue_max_[connection];
+ }
+ io::BaseStream resp;
+ resp.writeData(&head, 1);
+ resp.writeUTF(response.str());
+ write(fd, resp.getBuffer(), resp.getSize());
+ } else if (what == "processors") {
--- End diff --
minor: processors is a little broad given that we are getting all
components so this is bundling in ports as well. does components make more
sense?
---