Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/237#discussion_r161234180
  
    --- Diff: libminifi/src/c2/ControllerSocketProtocol.cpp ---
    @@ -0,0 +1,245 @@
    +/**
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +#include "c2/ControllerSocketProtocol.h"
    +#include "utils/StringUtils.h"
    +#include "io/DescriptorStream.h"
    +#include <utility>
    +#include <memory>
    +#include <vector>
    +#include <string>
    +
    +namespace org {
    +namespace apache {
    +namespace nifi {
    +namespace minifi {
    +namespace c2 {
    +
    +void ControllerSocketProtocol::initialize(const 
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const 
std::shared_ptr<state::StateMonitor> &updateSink,
    +                                          const std::shared_ptr<Configure> 
&configuration) {
    +  HeartBeatReporter::initialize(controller, updateSink, configuration);
    +  stream_factory_ = 
std::make_shared<minifi::io::StreamFactory>(configuration);
    +
    +  std::string host, port, caCert;
    +  if (nullptr != configuration_ && 
configuration_->get("controller.socket.host", host) && 
configuration_->get("controller.socket.port", port)) {
    +    server_socket_ = std::unique_ptr<io::ServerSocket>(new 
io::ServerSocket(nullptr, host, std::stoi(port), 2));
    +    server_socket_->initialize(true);
    +
    +    auto check = [this]() -> bool {
    +      return update_sink_->isRunning();
    +    };
    +
    +    auto handler = [this](int fd) {
    +      uint8_t head;
    +      io::DescriptorStream stream(fd);
    +      if (stream.read(head) != 1) {
    +        logger_->log_debug("Connection broke with fd %d", fd);
    +        return;
    +      }
    +      switch (head) {
    +        case Operation::START:
    +        {
    +          std::string componentStr;
    +          int size = stream.readUTF(componentStr);
    +          if ( size != -1 ) {
    +            auto components = update_sink_->getComponents(componentStr);
    +            for (auto component : components) {
    +              component->start();
    +            }
    +          } else {
    +            logger_->log_debug("Connection broke with fd %d", fd);
    +          }
    +        }
    +        break;
    +        case Operation::STOP:
    +        {
    +          std::string componentStr;
    +          int size = stream.readUTF(componentStr);
    +          if ( size != -1 ) {
    +            auto components = update_sink_->getComponents(componentStr);
    +            for (auto component : components) {
    +              component->stop(true, 1000);
    +            }
    +          } else {
    +            logger_->log_debug("Connection broke with fd %d", fd);
    +          }
    +        }
    +        break;
    +        case Operation::CLEAR:
    +        {
    +          std::string connection;
    +          int size = stream.readUTF(connection);
    +          if ( size != -1 ) {
    +            update_sink_->clearConnection(connection);
    +          }
    +        }
    +        break;
    +        case Operation::UPDATE:
    +        {
    +          std::string what;
    +          int size = stream.readUTF(what);
    +          if (size == -1) {
    +            logger_->log_debug("Connection broke with fd %d", fd);
    +            break;
    +          }
    +          if (what == "flow") {
    +            std::string ff_loc;
    +            int size = stream.readUTF(ff_loc);
    +            std::ifstream tf(ff_loc);
    +            std::string configuration((std::istreambuf_iterator<char>(tf)),
    +                std::istreambuf_iterator<char>());
    +            if (size == -1) {
    +              logger_->log_debug("Connection broke with fd %d", fd);
    +              break;
    +            }
    +            update_sink_->applyUpdate(configuration);
    +          }
    +        }
    +        break;
    +        case Operation::DESCRIBE:
    +        {
    +          std::string what;
    +          int size = stream.readUTF(what);
    +          if (size == -1) {
    +            logger_->log_debug("Connection broke with fd %d", fd);
    +            break;
    +          }
    +          if (what == "queue") {
    +            std::string connection;
    +            int size = stream.readUTF(connection);
    +            if (size == -1) {
    +              logger_->log_debug("Connection broke with fd %d", fd);
    +              break;
    +            }
    +            std::stringstream response;
    +            {
    +              std::lock_guard<std::mutex> lock(controller_mutex_);
    +              response << queue_size_[connection] << " / " << 
queue_max_[connection];
    +            }
    +            io::BaseStream resp;
    +            resp.writeData(&head, 1);
    +            resp.writeUTF(response.str());
    +            write(fd, resp.getBuffer(), resp.getSize());
    +          } else if (what == "processors") {
    --- End diff --
    
    minor:  processors is a little broad given that we are getting all 
components so this is bundling in ports as well.  does components make more 
sense?


---

Reply via email to