http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/http-curl/protocols/RESTProtocol.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/protocols/RESTProtocol.cpp b/extensions/http-curl/protocols/RESTProtocol.cpp deleted file mode 100644 index afbe3c9..0000000 --- a/extensions/http-curl/protocols/RESTProtocol.cpp +++ /dev/null @@ -1,177 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "RESTProtocol.h" - -#include <algorithm> -#include <memory> -#include <utility> -#include <map> -#include <string> -#include <vector> - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace c2 { - -const C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload, const std::vector<char> &response) { - Json::Reader reader; - Json::Value root; - try { - if (reader.parse(std::string(response.data(), response.size()), root)) { - std::string requested_operation = getOperation(payload); - - std::string identifier; - if (root.isMember("operationid")) { - identifier = root["operationid"].asString(); - } - if (root["operation"].asString() == requested_operation) { - if (root["requested_operations"].size() == 0) { - return std::move(C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true)); - } - C2Payload new_payload(payload.getOperation(), state::UpdateState::NESTED, true); - - new_payload.setIdentifier(identifier); - - for (const Json::Value& request : root["requested_operations"]) { - Operation newOp = stringToOperation(request["operation"].asString()); - C2Payload nested_payload(newOp, state::UpdateState::READ_COMPLETE, true); - C2ContentResponse new_command(newOp); - new_command.delay = 0; - new_command.required = true; - new_command.ttl = -1; - // set the identifier if one exists - if (request.isMember("operationid")) { - new_command.ident = request["operationid"].asString(); - nested_payload.setIdentifier(new_command.ident); - } - new_command.name = request["name"].asString(); - - if (request.isMember("content") && request["content"].size() > 0) { - for (const auto &name : request["content"].getMemberNames()) { - new_command.operation_arguments[name] = request["content"][name].asString(); - } - } - nested_payload.addContent(std::move(new_command)); - new_payload.addPayload(std::move(nested_payload)); - } - // we have a response for this request - return new_payload; - } - } - } catch (...) { - } - return std::move(C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true)); -} - -Json::Value RESTProtocol::serializeJsonPayload(Json::Value &json_root, const C2Payload &payload) { - // get the name from the content - Json::Value json_payload; - std::map<std::string, std::vector<Json::Value>> children; - for (const auto &nested_payload : payload.getNestedPayloads()) { - Json::Value child_payload = serializeJsonPayload(json_payload, nested_payload); - children[nested_payload.getLabel()].push_back(child_payload); - } - for (auto child_vector : children) { - if (child_vector.second.size() > 1) { - Json::Value children_json(Json::arrayValue); - for (auto child : child_vector.second) { - json_payload[child_vector.first] = child; - } - } else { - if (child_vector.second.size() == 1) { - if (child_vector.second.at(0).isMember(child_vector.first)) { - json_payload[child_vector.first] = child_vector.second.at(0)[child_vector.first]; - } else { - json_payload[child_vector.first] = child_vector.second.at(0); - } - } - } - } - - const std::vector<C2ContentResponse> &content = payload.getContent(); - for (const auto &payload_content : content) { - Json::Value payload_content_values; - bool use_sub_option = true; - if (payload_content.op == payload.getOperation()) { - for (auto content : payload_content.operation_arguments) { - if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) { - json_payload[payload_content.name] = content.second; - use_sub_option = false; - } else { - payload_content_values[content.first] = content.second; - } - } - } - if (use_sub_option) - json_payload[payload_content.name] = payload_content_values; - } - return json_payload; -} - -std::string RESTProtocol::getOperation(const C2Payload &payload) { - switch (payload.getOperation()) { - case Operation::ACKNOWLEDGE: - return "acknowledge"; - case Operation::HEARTBEAT: - return "heartbeat"; - case Operation::RESTART: - return "restart"; - case Operation::DESCRIBE: - return "describe"; - case Operation::STOP: - return "stop"; - case Operation::START: - return "start"; - case Operation::UPDATE: - return "update"; - default: - return "heartbeat"; - } -} - -Operation RESTProtocol::stringToOperation(const std::string str) { - std::string op = str; - std::transform(str.begin(), str.end(), op.begin(), ::tolower); - if (op == "heartbeat") { - return Operation::HEARTBEAT; - } else if (op == "acknowledge") { - return Operation::ACKNOWLEDGE; - } else if (op == "update") { - return Operation::UPDATE; - } else if (op == "describe") { - return Operation::DESCRIBE; - } else if (op == "restart") { - return Operation::RESTART; - } else if (op == "clear") { - return Operation::CLEAR; - } else if (op == "stop") { - return Operation::STOP; - } else if (op == "start") { - return Operation::START; - } - return Operation::HEARTBEAT; -} - -} /* namespace c2 */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/http-curl/protocols/RESTProtocol.h ---------------------------------------------------------------------- diff --git a/extensions/http-curl/protocols/RESTProtocol.h b/extensions/http-curl/protocols/RESTProtocol.h deleted file mode 100644 index 4767e77..0000000 --- a/extensions/http-curl/protocols/RESTProtocol.h +++ /dev/null @@ -1,75 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTPROTOCOL_H_ -#define LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTPROTOCOL_H_ - -#include "json/json.h" -#include "json/writer.h" -#include <string> -#include <mutex> - -#include "utils/ByteArrayCallback.h" -#include "CivetServer.h" -#include "c2/C2Protocol.h" -#include "c2/HeartBeatReporter.h" -#include "controllers/SSLContextService.h" -#include "utils/HTTPClient.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace c2 { - -/** - * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol. - * - * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST - * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction - * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively. - * - */ -class RESTProtocol { - public: - RESTProtocol() { - - } - - virtual ~RESTProtocol() { - - } - - protected: - - virtual Json::Value serializeJsonPayload(Json::Value &json_root, const C2Payload &payload); - - virtual const C2Payload parseJsonResponse(const C2Payload &payload, const std::vector<char> &response); - - virtual std::string getOperation(const C2Payload &payload); - - virtual Operation stringToOperation(const std::string str); - -}; - -} /* namesapce c2 */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ - -#endif /* LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTOPERATIONS_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/http-curl/protocols/RESTReceiver.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/protocols/RESTReceiver.cpp b/extensions/http-curl/protocols/RESTReceiver.cpp deleted file mode 100644 index 4c46516..0000000 --- a/extensions/http-curl/protocols/RESTReceiver.cpp +++ /dev/null @@ -1,147 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "RESTReceiver.h" -#include <algorithm> -#include <memory> -#include <utility> -#include <map> -#include <string> -#include <vector> - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace c2 { - -int log_message(const struct mg_connection *conn, const char *message) { - puts(message); - return 1; -} - -int ssl_protocol_en(void *ssl_context, void *user_data) { - return 0; -} - -RESTReceiver::RESTReceiver(std::string name, uuid_t uuid) - : HeartBeatReporter(name, uuid), - logger_(logging::LoggerFactory<RESTReceiver>::getLogger()) { -} - -void RESTReceiver::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) { - HeartBeatReporter::initialize(controller, configure); - logger_->log_debug("Initializing rest receiveer"); - if (nullptr != configuration_) { - std::string listeningPort, rootUri, caCert; - configuration_->get("c2.rest.listener.port", listeningPort); - configuration_->get("c2.rest.listener.heartbeat.rooturi", rootUri); - configuration_->get("c2.rest.listener.cacert", caCert); - - if (!listeningPort.empty() && !rootUri.empty()) { - handler = std::unique_ptr<ListeningProtocol>(new ListeningProtocol()); - if (!caCert.empty()) { - listener = start_webserver(listeningPort, rootUri, dynamic_cast<CivetHandler*>(handler.get()), caCert); - } else { - listener = start_webserver(listeningPort, rootUri, dynamic_cast<CivetHandler*>(handler.get())); - } - } - } -} -int16_t RESTReceiver::heartbeat(const C2Payload &payload) { - std::string operation_request_str = getOperation(payload); - std::string outputConfig; - Json::Value json_payload; - json_payload["operation"] = operation_request_str; - if (payload.getIdentifier().length() > 0) { - json_payload["operationid"] = payload.getIdentifier(); - } - const std::vector<C2ContentResponse> &content = payload.getContent(); - - for (const auto &payload_content : content) { - Json::Value payload_content_values; - bool use_sub_option = true; - if (payload_content.op == payload.getOperation()) { - for (auto content : payload_content.operation_arguments) { - if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) { - json_payload[payload_content.name] = content.second; - use_sub_option = false; - } else { - payload_content_values[content.first] = content.second; - } - } - } - if (use_sub_option) - json_payload[payload_content.name] = payload_content_values; - } - - for (const auto &nested_payload : payload.getNestedPayloads()) { - json_payload[nested_payload.getLabel()] = serializeJsonPayload(json_payload, nested_payload); - } - - Json::StyledWriter writer; - outputConfig = writer.write(json_payload); - if (handler != nullptr) { - logger_->log_debug("Setting %s", outputConfig); - handler->setResponse(outputConfig); - } - - return 0; -} - -std::unique_ptr<CivetServer> RESTReceiver::start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler, std::string &ca_cert) { - struct mg_callbacks callback; - - memset(&callback, 0, sizeof(callback)); - callback.init_ssl = ssl_protocol_en; - std::string my_port = port; - my_port += "s"; - callback.log_message = log_message; - const char *options[] = { "listening_ports", port.c_str(), "ssl_certificate", ca_cert.c_str(), "ssl_protocol_version", "0", "ssl_cipher_list", "ALL", - "ssl_verify_peer", "no", "num_threads", "1", 0 }; - - std::vector<std::string> cpp_options; - for (uint32_t i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { - cpp_options.push_back(options[i]); - } - std::unique_ptr<CivetServer> server = std::unique_ptr<CivetServer>(new CivetServer(cpp_options)); - - server->addHandler(rooturi, handler); - - return server; -} - -std::unique_ptr<CivetServer> RESTReceiver::start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler) { - const char *options[] = { "document_root", ".", "listening_ports", port.c_str(), "num_threads", "1", 0 }; - - std::vector<std::string> cpp_options; - for (uint32_t i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { - cpp_options.push_back(options[i]); - } - std::unique_ptr<CivetServer> server = std::unique_ptr<CivetServer>(new CivetServer(cpp_options)); - - server->addHandler(rooturi, handler); - - return server; -} - -} /* namespace c2 */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/http-curl/protocols/RESTReceiver.h ---------------------------------------------------------------------- diff --git a/extensions/http-curl/protocols/RESTReceiver.h b/extensions/http-curl/protocols/RESTReceiver.h deleted file mode 100644 index b0de62a..0000000 --- a/extensions/http-curl/protocols/RESTReceiver.h +++ /dev/null @@ -1,111 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIBMINIFI_INCLUDE_C2_RESTRCVR_H_ -#define LIBMINIFI_INCLUDE_C2_RESTRCVR_H_ - -#include "RESTSender.h" -#include "json/json.h" -#include "json/writer.h" -#include <string> -#include <mutex> -#include "core/Resource.h" -#include "RESTProtocol.h" -#include "CivetServer.h" -#include "c2/C2Protocol.h" -#include "controllers/SSLContextService.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace c2 { - -int log_message(const struct mg_connection *conn, const char *message); - -int ssl_protocol_en(void *ssl_context, void *user_data); - -/** - * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol. - * - * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST - * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction - * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively. - * - */ -class RESTReceiver : public RESTProtocol, public HeartBeatReporter { - public: - RESTReceiver(std::string name, uuid_t uuid = nullptr); - - virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) override; - virtual int16_t heartbeat(const C2Payload &heartbeat) override; - - protected: - - class ListeningProtocol : public CivetHandler { - - public: - ListeningProtocol() { - - } - - bool handleGet(CivetServer *server, struct mg_connection *conn) { - std::string currentvalue; - { - std::lock_guard<std::mutex> lock(reponse_mutex_); - currentvalue = resp_; - } - - std::stringstream output; - output << "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: " << currentvalue.length() << "\r\nConnection: close\r\n\r\n"; - - mg_printf(conn, "%s", output.str().c_str()); - mg_printf(conn, "%s", currentvalue.c_str()); - return true; - } - - void setResponse(std::string response) { - std::lock_guard<std::mutex> lock(reponse_mutex_); - resp_ = response; - } - - protected: - std::mutex reponse_mutex_; - std::string resp_; - - }; - - std::unique_ptr<CivetServer> start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler, std::string &ca_cert); - - std::unique_ptr<CivetServer> start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler); - - std::unique_ptr<CivetServer> listener; - std::unique_ptr<ListeningProtocol> handler; - - private: - std::shared_ptr<logging::Logger> logger_; -}; - -REGISTER_RESOURCE(RESTReceiver); - -} /* namesapce c2 */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ - -#endif /* LIBMINIFI_INCLUDE_C2_RESTRCVR_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/http-curl/protocols/RESTSender.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp deleted file mode 100644 index ebf532a..0000000 --- a/extensions/http-curl/protocols/RESTSender.cpp +++ /dev/null @@ -1,140 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "RESTSender.h" - -#include <algorithm> -#include <memory> -#include <utility> -#include <map> -#include <string> -#include <vector> - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace c2 { - -RESTSender::RESTSender(std::string name, uuid_t uuid) - : C2Protocol(name, uuid), - logger_(logging::LoggerFactory<Connectable>::getLogger()) { -} - -void RESTSender::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) { - C2Protocol::initialize(controller, configure); - // base URL when one is not specified. - if (nullptr != configure) { - configure->get("c2.rest.url", rest_uri_); - configure->get("c2.rest.url.ack", ack_uri_); - } - logger_->log_info("Submitting to %s", rest_uri_); -} -C2Payload RESTSender::consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async) { - std::string operation_request_str = getOperation(payload); - std::string outputConfig; - if (direction == Direction::TRANSMIT) { - Json::Value json_payload; - json_payload["operation"] = operation_request_str; - if (payload.getIdentifier().length() > 0) { - json_payload["operationid"] = payload.getIdentifier(); - } - const std::vector<C2ContentResponse> &content = payload.getContent(); - - for (const auto &payload_content : content) { - Json::Value payload_content_values; - bool use_sub_option = true; - if (payload_content.op == payload.getOperation()) { - for (auto content : payload_content.operation_arguments) { - if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) { - json_payload[payload_content.name] = content.second; - use_sub_option = false; - } else { - payload_content_values[content.first] = content.second; - } - } - } - if (use_sub_option) - json_payload[payload_content.name] = payload_content_values; - } - - for (const auto &nested_payload : payload.getNestedPayloads()) { - json_payload[nested_payload.getLabel()] = serializeJsonPayload(json_payload, nested_payload); - } - - Json::StyledWriter writer; - outputConfig = writer.write(json_payload); - } - - return sendPayload(url, direction, payload, outputConfig); -} - -C2Payload RESTSender::consumePayload(const C2Payload &payload, Direction direction, bool async) { - if (payload.getOperation() == ACKNOWLEDGE) { - return consumePayload(ack_uri_, payload, direction, async); - } - return consumePayload(rest_uri_, payload, direction, async); -} - -void RESTSender::update(const std::shared_ptr<Configure> &configure) { - std::string url; - configure->get("c2.rest.url", url); - configure->get("c2.rest.url.ack", url); -} - -const C2Payload RESTSender::sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig) { - utils::HTTPClient client(url, ssl_context_service_); - client.setConnectionTimeout(2); - - std::unique_ptr<utils::ByteInputCallBack> input = nullptr; - std::unique_ptr<utils::HTTPUploadCallback> callback = nullptr; - if (direction == Direction::TRANSMIT) { - input = std::unique_ptr<utils::ByteInputCallBack>(new utils::ByteInputCallBack()); - callback = std::unique_ptr<utils::HTTPUploadCallback>(new utils::HTTPUploadCallback); - input->write(outputConfig); - callback->ptr = input.get(); - callback->pos = 0; - client.set_request_method("POST"); - client.setUploadCallback(callback.get()); - } else { - // we do not need to set the uplaod callback - // since we are not uploading anything on a get - client.set_request_method("GET"); - } - client.setContentType("application/json"); - bool isOkay = client.submit(); - int64_t respCode = client.getResponseCode(); - - if (isOkay && respCode) { - if (payload.isRaw()) { - C2Payload response_payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true, true); - - response_payload.setRawData(client.getResponseBody()); - return response_payload; - } - return parseJsonResponse(payload, client.getResponseBody()); - } else { - return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true); - } -} - -} /* namespace c2 */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/http-curl/protocols/RESTSender.h ---------------------------------------------------------------------- diff --git a/extensions/http-curl/protocols/RESTSender.h b/extensions/http-curl/protocols/RESTSender.h deleted file mode 100644 index e4c1e5e..0000000 --- a/extensions/http-curl/protocols/RESTSender.h +++ /dev/null @@ -1,81 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIBMINIFI_INCLUDE_C2_RESTSENDER_H_ -#define LIBMINIFI_INCLUDE_C2_RESTSENDER_H_ - -#include "json/json.h" -#include "json/writer.h" -#include <string> -#include <mutex> - -#include "utils/ByteArrayCallback.h" -#include "CivetServer.h" -#include "c2/C2Protocol.h" -#include "RESTProtocol.h" -#include "c2/HeartBeatReporter.h" -#include "controllers/SSLContextService.h" -#include "../client/HTTPClient.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace c2 { - -/** - * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol. - * - * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST - * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction - * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively. - * - */ -class RESTSender : public RESTProtocol, public C2Protocol { - public: - - explicit RESTSender(std::string name, uuid_t uuid = nullptr); - - virtual C2Payload consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async) override; - - virtual C2Payload consumePayload(const C2Payload &payload, Direction direction, bool async) override; - - virtual void update(const std::shared_ptr<Configure> &configure) override; - - virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) override; - - protected: - - virtual const C2Payload sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig); - - std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_; - - private: - std::shared_ptr<logging::Logger> logger_; - std::string rest_uri_; - std::string ack_uri_; -}; - -REGISTER_RESOURCE(RESTSender); - -} /* namesapce c2 */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ - -#endif /* LIBMINIFI_INCLUDE_C2_RESTPROTOCOL_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/libarchive/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/extensions/libarchive/CMakeLists.txt b/extensions/libarchive/CMakeLists.txt index 8a8d37f..1ed8c5b 100644 --- a/extensions/libarchive/CMakeLists.txt +++ b/extensions/libarchive/CMakeLists.txt @@ -20,7 +20,7 @@ set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols") set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols") -include_directories(../../libminifi/include ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ${CIVET_THIRDPARTY_ROOT}/include ../../thirdparty/jsoncpp/include ../../thirdparty/) +include_directories(../../libminifi/include ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../../thirdparty/jsoncpp/include ../../thirdparty/) find_package(LibArchive) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/rocksdb-repos/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/extensions/rocksdb-repos/CMakeLists.txt b/extensions/rocksdb-repos/CMakeLists.txt index cb1bdb1..5d459e5 100644 --- a/extensions/rocksdb-repos/CMakeLists.txt +++ b/extensions/rocksdb-repos/CMakeLists.txt @@ -23,7 +23,7 @@ cmake_minimum_required(VERSION 2.6) set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols") set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols") -include_directories(../../libminifi/include ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ${CIVET_THIRDPARTY_ROOT}/include ../../thirdparty/jsoncpp/include ../../thirdparty/) +include_directories(../../libminifi/include ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../../thirdparty/rocksdb/include ../../thirdparty/) find_package(RocksDB) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/script/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/extensions/script/CMakeLists.txt b/extensions/script/CMakeLists.txt index 1501c68..927d974 100644 --- a/extensions/script/CMakeLists.txt +++ b/extensions/script/CMakeLists.txt @@ -37,7 +37,7 @@ endif() set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols") set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols") -include_directories(../../libminifi/include ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ${CIVET_THIRDPARTY_ROOT}/include ../../thirdparty/jsoncpp/include ../../thirdparty/) +include_directories(../../libminifi/include ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../../thirdparty/jsoncpp/include ../../thirdparty/) file(GLOB SOURCES "*.cpp") http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt index 6ede84f..a40f070 100644 --- a/libminifi/CMakeLists.txt +++ b/libminifi/CMakeLists.txt @@ -55,7 +55,6 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wno-reorder") include_directories(../thirdparty/spdlog-20170710/include) include_directories(../thirdparty/yaml-cpp-yaml-cpp-20171024/include) -include_directories(${CIVET_THIRDPARTY_ROOT}/include) include_directories(../thirdparty/jsoncpp/include) include_directories(../thirdparty/concurrentqueue/) include_directories(include) @@ -73,7 +72,7 @@ file(GLOB SPD_SOURCES "../thirdparty/spdlog-20170710/include/spdlog/*") add_library(spdlog STATIC ${SPD_SOURCES}) add_library(core-minifi STATIC ${SOURCES}) add_dependencies(core-minifi jsoncpp_project) -target_link_libraries(core-minifi ${UUID_LIBRARIES} ${JSONCPP_LIB} yaml-cpp ) +target_link_libraries(core-minifi ${UUID_LIBRARIES} ${JSONCPP_LIB} yaml-cpp dl) find_package(ZLIB REQUIRED) include_directories(${ZLIB_INCLUDE_DIRS}) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/include/c2/protocols/RESTProtocol.h ---------------------------------------------------------------------- diff --git a/libminifi/include/c2/protocols/RESTProtocol.h b/libminifi/include/c2/protocols/RESTProtocol.h new file mode 100644 index 0000000..823b5a9 --- /dev/null +++ b/libminifi/include/c2/protocols/RESTProtocol.h @@ -0,0 +1,74 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTPROTOCOL_H_ +#define LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTPROTOCOL_H_ + +#include "json/json.h" +#include "json/writer.h" +#include <string> +#include <mutex> + +#include "utils/ByteArrayCallback.h" +#include "c2/C2Protocol.h" +#include "c2/HeartBeatReporter.h" +#include "controllers/SSLContextService.h" +#include "utils/HTTPClient.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +/** + * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol. + * + * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST + * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction + * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively. + * + */ +class RESTProtocol { + public: + RESTProtocol() { + + } + + virtual ~RESTProtocol() { + + } + + protected: + + virtual Json::Value serializeJsonPayload(Json::Value &json_root, const C2Payload &payload); + + virtual const C2Payload parseJsonResponse(const C2Payload &payload, const std::vector<char> &response); + + virtual std::string getOperation(const C2Payload &payload); + + virtual Operation stringToOperation(const std::string str); + +}; + +} /* namesapce c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTOPERATIONS_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/include/core/FlowConfiguration.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h index 6e644ef..eab7169 100644 --- a/libminifi/include/core/FlowConfiguration.h +++ b/libminifi/include/core/FlowConfiguration.h @@ -25,6 +25,7 @@ #include "core/controller/StandardControllerServiceProvider.h" #include "provenance/Provenance.h" #include "core/reporting/SiteToSiteProvenanceReportingTask.h" + #include "core/Processor.h" #include "core/Resource.h" #include "core/logging/LoggerConfiguration.h" http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/include/processors/ListenHTTP.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/ListenHTTP.h b/libminifi/include/processors/ListenHTTP.h deleted file mode 100644 index 1b58dcd..0000000 --- a/libminifi/include/processors/ListenHTTP.h +++ /dev/null @@ -1,121 +0,0 @@ -/** - * @file ListenHTTP.h - * ListenHTTP class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __LISTEN_HTTP_H__ -#define __LISTEN_HTTP_H__ - -#include <memory> -#include <regex> - -#include <CivetServer.h> - -#include "FlowFileRecord.h" -#include "core/Processor.h" -#include "core/ProcessSession.h" -#include "core/Core.h" -#include "core/Resource.h" -#include "core/logging/LoggerConfiguration.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace processors { - -// ListenHTTP Class -class ListenHTTP : public core::Processor { - public: - - // Constructor - /*! - * Create a new processor - */ - ListenHTTP(std::string name, uuid_t uuid = NULL) - : Processor(name, uuid), - logger_(logging::LoggerFactory<ListenHTTP>::getLogger()) { - } - // Destructor - virtual ~ListenHTTP(); - // Processor Name - static constexpr char const* ProcessorName = "ListenHTTP"; - // Supported Properties - static core::Property BasePath; - static core::Property Port; - static core::Property AuthorizedDNPattern; - static core::Property SSLCertificate; - static core::Property SSLCertificateAuthority; - static core::Property SSLVerifyPeer; - static core::Property SSLMinimumVersion; - static core::Property HeadersAsAttributesRegex; - // Supported Relationships - static core::Relationship Success; - - void onTrigger(core::ProcessContext *context, core::ProcessSession *session); - void initialize(); - void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); - - // HTTP request handler - class Handler : public CivetHandler { - public: - Handler(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory, std::string &&authDNPattern, std::string &&headersAsAttributesPattern);bool handlePost( - CivetServer *server, struct mg_connection *conn); - - private: - // Send HTTP 500 error response to client - void sendErrorResponse(struct mg_connection *conn); - // Logger - std::shared_ptr<logging::Logger> logger_; - - std::regex _authDNRegex; - std::regex _headersAsAttributesRegex; - core::ProcessContext *_processContext; - core::ProcessSessionFactory *_processSessionFactory; - }; - - // Write callback for transferring data from HTTP request to content repo - class WriteCallback : public OutputStreamCallback { - public: - WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo); - int64_t process(std::shared_ptr<io::BaseStream> stream); - - private: - // Logger - std::shared_ptr<logging::Logger> logger_; - - struct mg_connection *_conn; - const struct mg_request_info *_reqInfo; - }; - - private: - // Logger - std::shared_ptr<logging::Logger> logger_; - - std::unique_ptr<CivetServer> _server; - std::unique_ptr<Handler> _handler; -}; - -REGISTER_RESOURCE(ListenHTTP); - -} /* namespace processors */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/src/c2/protocols/RESTProtocol.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/c2/protocols/RESTProtocol.cpp b/libminifi/src/c2/protocols/RESTProtocol.cpp new file mode 100644 index 0000000..946e3c6 --- /dev/null +++ b/libminifi/src/c2/protocols/RESTProtocol.cpp @@ -0,0 +1,177 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "c2/protocols/RESTProtocol.h" + +#include <algorithm> +#include <memory> +#include <utility> +#include <map> +#include <string> +#include <vector> + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +const C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload, const std::vector<char> &response) { + Json::Reader reader; + Json::Value root; + try { + if (reader.parse(std::string(response.data(), response.size()), root)) { + std::string requested_operation = getOperation(payload); + + std::string identifier; + if (root.isMember("operationid")) { + identifier = root["operationid"].asString(); + } + if (root["operation"].asString() == requested_operation) { + if (root["requested_operations"].size() == 0) { + return std::move(C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true)); + } + C2Payload new_payload(payload.getOperation(), state::UpdateState::NESTED, true); + + new_payload.setIdentifier(identifier); + + for (const Json::Value& request : root["requested_operations"]) { + Operation newOp = stringToOperation(request["operation"].asString()); + C2Payload nested_payload(newOp, state::UpdateState::READ_COMPLETE, true); + C2ContentResponse new_command(newOp); + new_command.delay = 0; + new_command.required = true; + new_command.ttl = -1; + // set the identifier if one exists + if (request.isMember("operationid")) { + new_command.ident = request["operationid"].asString(); + nested_payload.setIdentifier(new_command.ident); + } + new_command.name = request["name"].asString(); + + if (request.isMember("content") && request["content"].size() > 0) { + for (const auto &name : request["content"].getMemberNames()) { + new_command.operation_arguments[name] = request["content"][name].asString(); + } + } + nested_payload.addContent(std::move(new_command)); + new_payload.addPayload(std::move(nested_payload)); + } + // we have a response for this request + return new_payload; + } + } + } catch (...) { + } + return std::move(C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true)); +} + +Json::Value RESTProtocol::serializeJsonPayload(Json::Value &json_root, const C2Payload &payload) { + // get the name from the content + Json::Value json_payload; + std::map<std::string, std::vector<Json::Value>> children; + for (const auto &nested_payload : payload.getNestedPayloads()) { + Json::Value child_payload = serializeJsonPayload(json_payload, nested_payload); + children[nested_payload.getLabel()].push_back(child_payload); + } + for (auto child_vector : children) { + if (child_vector.second.size() > 1) { + Json::Value children_json(Json::arrayValue); + for (auto child : child_vector.second) { + json_payload[child_vector.first] = child; + } + } else { + if (child_vector.second.size() == 1) { + if (child_vector.second.at(0).isMember(child_vector.first)) { + json_payload[child_vector.first] = child_vector.second.at(0)[child_vector.first]; + } else { + json_payload[child_vector.first] = child_vector.second.at(0); + } + } + } + } + + const std::vector<C2ContentResponse> &content = payload.getContent(); + for (const auto &payload_content : content) { + Json::Value payload_content_values; + bool use_sub_option = true; + if (payload_content.op == payload.getOperation()) { + for (auto content : payload_content.operation_arguments) { + if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) { + json_payload[payload_content.name] = content.second; + use_sub_option = false; + } else { + payload_content_values[content.first] = content.second; + } + } + } + if (use_sub_option) + json_payload[payload_content.name] = payload_content_values; + } + return json_payload; +} + +std::string RESTProtocol::getOperation(const C2Payload &payload) { + switch (payload.getOperation()) { + case Operation::ACKNOWLEDGE: + return "acknowledge"; + case Operation::HEARTBEAT: + return "heartbeat"; + case Operation::RESTART: + return "restart"; + case Operation::DESCRIBE: + return "describe"; + case Operation::STOP: + return "stop"; + case Operation::START: + return "start"; + case Operation::UPDATE: + return "update"; + default: + return "heartbeat"; + } +} + +Operation RESTProtocol::stringToOperation(const std::string str) { + std::string op = str; + std::transform(str.begin(), str.end(), op.begin(), ::tolower); + if (op == "heartbeat") { + return Operation::HEARTBEAT; + } else if (op == "acknowledge") { + return Operation::ACKNOWLEDGE; + } else if (op == "update") { + return Operation::UPDATE; + } else if (op == "describe") { + return Operation::DESCRIBE; + } else if (op == "restart") { + return Operation::RESTART; + } else if (op == "clear") { + return Operation::CLEAR; + } else if (op == "stop") { + return Operation::STOP; + } else if (op == "start") { + return Operation::START; + } + return Operation::HEARTBEAT; +} + +} /* namespace c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/src/processors/ListenHTTP.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/ListenHTTP.cpp b/libminifi/src/processors/ListenHTTP.cpp deleted file mode 100644 index 62f8194..0000000 --- a/libminifi/src/processors/ListenHTTP.cpp +++ /dev/null @@ -1,333 +0,0 @@ -/** - * @file ListenHTTP.cpp - - * ListenHTTP class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "processors/ListenHTTP.h" -#include <uuid/uuid.h> -#include <CivetServer.h> -#include <stdio.h> -#include <sstream> -#include <utility> -#include <memory> -#include <string> -#include <iostream> -#include <fstream> -#include <set> -#include <vector> -#include "utils/TimeUtil.h" -#include "core/ProcessContext.h" -#include "core/ProcessSession.h" -#include "core/ProcessSessionFactory.h" -#include "core/logging/LoggerConfiguration.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace processors { - -core::Property ListenHTTP::BasePath("Base Path", "Base path for incoming connections", "contentListener"); -core::Property ListenHTTP::Port("Listening Port", "The Port to listen on for incoming connections", ""); -core::Property ListenHTTP::AuthorizedDNPattern("Authorized DN Pattern", "A Regular Expression to apply against the Distinguished Name of incoming" - " connections. If the Pattern does not match the DN, the connection will be refused.", - ".*"); -core::Property ListenHTTP::SSLCertificate("SSL Certificate", "File containing PEM-formatted file including TLS/SSL certificate and key", ""); -core::Property ListenHTTP::SSLCertificateAuthority("SSL Certificate Authority", "File containing trusted PEM-formatted certificates", ""); -core::Property ListenHTTP::SSLVerifyPeer("SSL Verify Peer", "Whether or not to verify the client's certificate (yes/no)", "no"); -core::Property ListenHTTP::SSLMinimumVersion("SSL Minimum Version", "Minimum TLS/SSL version allowed (SSL2, SSL3, TLS1.0, TLS1.1, TLS1.2)", "SSL2"); -core::Property ListenHTTP::HeadersAsAttributesRegex("HTTP Headers to receive as Attributes (Regex)", "Specifies the Regular Expression that determines the names of HTTP Headers that" - " should be passed along as FlowFile attributes", - ""); - -core::Relationship ListenHTTP::Success("success", "All files are routed to success"); - -void ListenHTTP::initialize() { - logger_->log_info("Initializing ListenHTTP"); - - // Set the supported properties - std::set<core::Property> properties; - properties.insert(BasePath); - properties.insert(Port); - properties.insert(AuthorizedDNPattern); - properties.insert(SSLCertificate); - properties.insert(SSLCertificateAuthority); - properties.insert(SSLVerifyPeer); - properties.insert(SSLMinimumVersion); - properties.insert(HeadersAsAttributesRegex); - setSupportedProperties(properties); - // Set the supported relationships - std::set<core::Relationship> relationships; - relationships.insert(Success); - setSupportedRelationships(relationships); -} - -void ListenHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { - std::string basePath; - - if (!context->getProperty(BasePath.getName(), basePath)) { - logger_->log_info("%s attribute is missing, so default value of %s will be used", BasePath.getName().c_str(), BasePath.getValue().c_str()); - basePath = BasePath.getValue(); - } - - basePath.insert(0, "/"); - - std::string listeningPort; - - if (!context->getProperty(Port.getName(), listeningPort)) { - logger_->log_error("%s attribute is missing or invalid", Port.getName().c_str()); - return; - } - - std::string authDNPattern; - - if (context->getProperty(AuthorizedDNPattern.getName(), authDNPattern) && !authDNPattern.empty()) { - logger_->log_info("ListenHTTP using %s: %s", AuthorizedDNPattern.getName().c_str(), authDNPattern.c_str()); - } - - std::string sslCertFile; - - if (context->getProperty(SSLCertificate.getName(), sslCertFile) && !sslCertFile.empty()) { - logger_->log_info("ListenHTTP using %s: %s", SSLCertificate.getName().c_str(), sslCertFile.c_str()); - } - - // Read further TLS/SSL options only if TLS/SSL usage is implied by virtue of certificate value being set - std::string sslCertAuthorityFile; - std::string sslVerifyPeer; - std::string sslMinVer; - - if (!sslCertFile.empty()) { - if (context->getProperty(SSLCertificateAuthority.getName(), sslCertAuthorityFile) && !sslCertAuthorityFile.empty()) { - logger_->log_info("ListenHTTP using %s: %s", SSLCertificateAuthority.getName().c_str(), sslCertAuthorityFile.c_str()); - } - - if (context->getProperty(SSLVerifyPeer.getName(), sslVerifyPeer)) { - if (sslVerifyPeer.empty() || sslVerifyPeer.compare("no") == 0) { - logger_->log_info("ListenHTTP will not verify peers"); - } else { - logger_->log_info("ListenHTTP will verify peers"); - } - } else { - logger_->log_info("ListenHTTP will not verify peers"); - } - - if (context->getProperty(SSLMinimumVersion.getName(), sslMinVer)) { - logger_->log_info("ListenHTTP using %s: %s", SSLMinimumVersion.getName().c_str(), sslMinVer.c_str()); - } - } - - std::string headersAsAttributesPattern; - - if (context->getProperty(HeadersAsAttributesRegex.getName(), headersAsAttributesPattern) && !headersAsAttributesPattern.empty()) { - logger_->log_info("ListenHTTP using %s: %s", HeadersAsAttributesRegex.getName().c_str(), headersAsAttributesPattern.c_str()); - } - - auto numThreads = getMaxConcurrentTasks(); - - logger_->log_info("ListenHTTP starting HTTP server on port %s and path %s with %d threads", listeningPort.c_str(), basePath.c_str(), numThreads); - - // Initialize web server - std::vector<std::string> options; - options.push_back("enable_keep_alive"); - options.push_back("yes"); - options.push_back("keep_alive_timeout_ms"); - options.push_back("15000"); - options.push_back("num_threads"); - options.push_back(std::to_string(numThreads)); - - if (sslCertFile.empty()) { - options.push_back("listening_ports"); - options.push_back(listeningPort); - } else { - listeningPort += "s"; - options.push_back("listening_ports"); - options.push_back(listeningPort); - - options.push_back("ssl_certificate"); - options.push_back(sslCertFile); - - if (!sslCertAuthorityFile.empty()) { - options.push_back("ssl_ca_file"); - options.push_back(sslCertAuthorityFile); - } - - if (sslVerifyPeer.empty() || sslVerifyPeer.compare("no") == 0) { - options.push_back("ssl_verify_peer"); - options.push_back("no"); - } else { - options.push_back("ssl_verify_peer"); - options.push_back("yes"); - } - - if (sslMinVer.compare("SSL2") == 0) { - options.push_back("ssl_protocol_version"); - options.push_back(std::to_string(0)); - } else if (sslMinVer.compare("SSL3") == 0) { - options.push_back("ssl_protocol_version"); - options.push_back(std::to_string(1)); - } else if (sslMinVer.compare("TLS1.0") == 0) { - options.push_back("ssl_protocol_version"); - options.push_back(std::to_string(2)); - } else if (sslMinVer.compare("TLS1.1") == 0) { - options.push_back("ssl_protocol_version"); - options.push_back(std::to_string(3)); - } else { - options.push_back("ssl_protocol_version"); - options.push_back(std::to_string(4)); - } - } - - _server.reset(new CivetServer(options)); - _handler.reset(new Handler(context, sessionFactory, std::move(authDNPattern), std::move(headersAsAttributesPattern))); - _server->addHandler(basePath, _handler.get()); -} - -ListenHTTP::~ListenHTTP() { -} - -void ListenHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { - std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get()); - - // Do nothing if there are no incoming files - if (!flowFile) { - return; - } -} - -ListenHTTP::Handler::Handler(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory, std::string &&authDNPattern, std::string &&headersAsAttributesPattern) - : _authDNRegex(std::move(authDNPattern)), - _headersAsAttributesRegex(std::move(headersAsAttributesPattern)), - logger_(logging::LoggerFactory<ListenHTTP::Handler>::getLogger()) { - _processContext = context; - _processSessionFactory = sessionFactory; -} - -void ListenHTTP::Handler::sendErrorResponse(struct mg_connection *conn) { - mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n" - "Content-Type: text/html\r\n" - "Content-Length: 0\r\n\r\n"); -} - -bool ListenHTTP::Handler::handlePost(CivetServer *server, struct mg_connection *conn) { - auto req_info = mg_get_request_info(conn); - logger_->log_info("ListenHTTP handling POST request of length %ll", req_info->content_length); - - // If this is a two-way TLS connection, authorize the peer against the configured pattern - if (req_info->is_ssl && req_info->client_cert != nullptr) { - if (!std::regex_match(req_info->client_cert->subject, _authDNRegex)) { - mg_printf(conn, "HTTP/1.1 403 Forbidden\r\n" - "Content-Type: text/html\r\n" - "Content-Length: 0\r\n\r\n"); - logger_->log_warn("ListenHTTP client DN not authorized: %s", req_info->client_cert->subject); - return true; - } - } - - // Always send 100 Continue, as allowed per standard to minimize client delay (https://www.w3.org/Protocols/rfc2616/rfc2616-sec8.html) - mg_printf(conn, "HTTP/1.1 100 Continue\r\n\r\n"); - - auto session = _processSessionFactory->createSession(); - ListenHTTP::WriteCallback callback(conn, req_info); - auto flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); - - if (!flowFile) { - sendErrorResponse(conn); - return true; - } - - try { - session->write(flowFile, &callback); - - // Add filename from "filename" header value (and pattern headers) - for (int i = 0; i < req_info->num_headers; i++) { - auto header = &req_info->http_headers[i]; - - if (strcmp("filename", header->name) == 0) { - if (!flowFile->updateAttribute("filename", header->value)) { - flowFile->addAttribute("filename", header->value); - } - } else if (std::regex_match(header->name, _headersAsAttributesRegex)) { - if (!flowFile->updateAttribute(header->name, header->value)) { - flowFile->addAttribute(header->name, header->value); - } - } - } - - session->transfer(flowFile, Success); - session->commit(); - } catch (std::exception &exception) { - logger_->log_debug("ListenHTTP Caught Exception %s", exception.what()); - sendErrorResponse(conn); - session->rollback(); - throw; - } catch (...) { - logger_->log_debug("ListenHTTP Caught Exception Processor::onTrigger"); - sendErrorResponse(conn); - session->rollback(); - throw; - } - - mg_printf(conn, "HTTP/1.1 200 OK\r\n" - "Content-Type: text/html\r\n" - "Content-Length: 0\r\n\r\n"); - - return true; -} - -ListenHTTP::WriteCallback::WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo) - : logger_(logging::LoggerFactory<ListenHTTP::WriteCallback>::getLogger()) { - _conn = conn; - _reqInfo = reqInfo; -} - -int64_t ListenHTTP::WriteCallback::process(std::shared_ptr<io::BaseStream> stream) { - int64_t rlen; - int64_t nlen = 0; - int64_t tlen = _reqInfo->content_length; - uint8_t buf[16384]; - - // if we have no content length we should call mg_read until - // there is no data left from the stream to be HTTP/1.1 compliant - while (tlen == -1 || nlen < tlen) { - rlen = tlen == -1 ? sizeof(buf) : tlen - nlen; - - if (rlen > (int64_t)sizeof(buf)) { - rlen = (int64_t)sizeof(buf); - } - - // Read a buffer of data from client - rlen = mg_read(_conn, &buf[0], (size_t) rlen); - - if (rlen <= 0) { - break; - } - - // Transfer buffer data to the output stream - stream->write(&buf[0], rlen); - - nlen += rlen; - } - - return nlen; -} - -} /* namespace processors */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/test/TestServer.h ---------------------------------------------------------------------- diff --git a/libminifi/test/TestServer.h b/libminifi/test/TestServer.h deleted file mode 100644 index 06f996c..0000000 --- a/libminifi/test/TestServer.h +++ /dev/null @@ -1,117 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIBMINIFI_TEST_TESTSERVER_H_ -#define LIBMINIFI_TEST_TESTSERVER_H_ -#include <regex.h> -#include <string> -#include <iostream> -#include "civetweb.h" -#include "CivetServer.h" - - -/* Server context handle */ -static std::string resp_str; - -void init_webserver() { - mg_init_library(0); -} - - -CivetServer * start_webserver(std::string &port, std::string &rooturi, CivetHandler *handler, struct mg_callbacks *callbacks, std::string &cert, std::string &ca_cert) { - const char *options[] = { "listening_ports", port.c_str(), "error_log_file", - "error.log", "ssl_certificate", ca_cert.c_str(), "ssl_protocol_version", "0", "ssl_cipher_list", - "ALL", "ssl_verify_peer", "no", 0 }; - - std::vector<std::string> cpp_options; - for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { - cpp_options.push_back(options[i]); - } - CivetServer *server = new CivetServer(cpp_options); - - server->addHandler(rooturi, handler); - - return server; - -} - -CivetServer * start_webserver(std::string &port, std::string &rooturi, CivetHandler *handler) { - const char *options[] = { "document_root", ".", "listening_ports", port.c_str(), 0 }; - - std::vector<std::string> cpp_options; - for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { - cpp_options.push_back(options[i]); - } - CivetServer *server = new CivetServer(cpp_options); - - server->addHandler(rooturi, handler); - - return server; - -} - -bool parse_http_components(const std::string &url, std::string &port, std::string &scheme, std::string &path) { - regex_t regex; - - const char *regexstr = "^(http|https)://(localhost:)([0-9]+)?(/.*)$"; - - int ret = regcomp(®ex, regexstr, REG_EXTENDED); - if (ret) { - return false; - } - - size_t potentialGroups = regex.re_nsub + 1; - regmatch_t groups[potentialGroups]; - if (regexec(®ex, url.c_str(), potentialGroups, groups, 0) == 0) { - for (int i = 0; i < potentialGroups; i++) { - if (groups[i].rm_so == -1) - break; - - std::string str(url.data() + groups[i].rm_so, groups[i].rm_eo - groups[i].rm_so); - switch (i) { - case 1: - scheme = str; - break; - case 3: - port = str; - break; - case 4: - path = str; - break; - default: - break; - } - } - } - if (path.empty() || scheme.empty() || port.empty()) - return false; - - regfree(®ex); - - return true; - -} - -static void stop_webserver(CivetServer *server) { - if (server != nullptr) - delete server; - - /* Un-initialize the library */ - mg_exit_library(); -} - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/test/curl-tests/C2NullConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/curl-tests/C2NullConfiguration.cpp b/libminifi/test/curl-tests/C2NullConfiguration.cpp deleted file mode 100644 index cbb1831..0000000 --- a/libminifi/test/curl-tests/C2NullConfiguration.cpp +++ /dev/null @@ -1,135 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <sys/stat.h> -#undef NDEBUG -#include <cassert> -#include <utility> -#include <chrono> -#include <fstream> -#include <memory> -#include <string> -#include <thread> -#include <type_traits> -#include <vector> -#include <iostream> -#include <sstream> -#include "HTTPClient.h" -#include "InvokeHTTP.h" -#include "../TestBase.h" -#include "utils/StringUtils.h" -#include "core/Core.h" -#include "../include/core/logging/Logger.h" -#include "core/ProcessGroup.h" -#include "core/yaml/YamlConfiguration.h" -#include "FlowController.h" -#include "properties/Configure.h" -#include "../unit/ProvenanceTestHelper.h" -#include "io/StreamFactory.h" -#include "CivetServer.h" -#include "RemoteProcessorGroupPort.h" -#include "core/ConfigurableComponent.h" -#include "controllers/SSLContextService.h" -#include "../TestServer.h" -#include "c2/C2Agent.h" -#include "protocols/RESTReceiver.h" -#include "../integration/IntegrationBase.h" -#include "processors/LogAttribute.h" - -class VerifyC2Server : public IntegrationBase { - public: - explicit VerifyC2Server(bool isSecure) - : isSecure(isSecure) { - char format[] = "/tmp/ssth.XXXXXX"; - dir = testController.createTempDirectory(format); - } - - void testSetup() { - LogTestController::getInstance().setDebug<utils::HTTPClient>(); - LogTestController::getInstance().setDebug<processors::InvokeHTTP>(); - LogTestController::getInstance().setDebug<minifi::c2::RESTReceiver>(); - LogTestController::getInstance().setDebug<minifi::c2::C2Agent>(); - LogTestController::getInstance().setDebug<processors::LogAttribute>(); - LogTestController::getInstance().setDebug<minifi::core::ProcessSession>(); - std::fstream file; - ss << dir << "/" << "tstFile.ext"; - file.open(ss.str(), std::ios::out); - file << "tempFile"; - file.close(); - } - - void cleanup() { - unlink(ss.str().c_str()); - } - - void runAssertions() { - assert(LogTestController::getInstance().contains("C2Agent] [info] Class is null") == true); - assert(LogTestController::getInstance().contains("C2Agent] [debug] Could not instantiate null") == true); - assert(LogTestController::getInstance().contains("Class is RESTSender") == true); - } - - void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) { - std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke"); - assert(proc != nullptr); - - std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc); - - assert(inv != nullptr); - std::string url = ""; - inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url); - - - std::string port, scheme, path; - parse_http_components(url, port, scheme, path); - configuration->set("c2.agent.protocol.class", "null"); - configuration->set("c2.rest.url", ""); - configuration->set("c2.rest.url.ack", ""); - configuration->set("c2.agent.heartbeat.reporter.classes", "null"); - configuration->set("c2.rest.listener.port", "null"); - configuration->set("c2.agent.heartbeat.period", "null"); - configuration->set("c2.rest.listener.heartbeat.rooturi", "null"); - } - - protected: - bool isSecure; - char *dir; - std::stringstream ss; - TestController testController; -}; - -int main(int argc, char **argv) { - std::string key_dir, test_file_location, url; - if (argc > 1) { - test_file_location = argv[1]; - key_dir = argv[2]; - } - - bool isSecure = false; - if (url.find("https") != std::string::npos) { - isSecure = true; - } - - VerifyC2Server harness(isSecure); - - harness.setKeyDir(key_dir); - - harness.run(test_file_location); - - return 0; -} - http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/test/curl-tests/C2UpdateTest.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/curl-tests/C2UpdateTest.cpp b/libminifi/test/curl-tests/C2UpdateTest.cpp deleted file mode 100644 index 1a9fe5b..0000000 --- a/libminifi/test/curl-tests/C2UpdateTest.cpp +++ /dev/null @@ -1,183 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <sys/stat.h> -#undef NDEBUG -#include <cassert> -#include <utility> -#include <chrono> -#include <fstream> -#include <memory> -#include <string> -#include <thread> -#include <type_traits> -#include <vector> -#include <iostream> -#include <sstream> -#include "HTTPClient.h" -#include "InvokeHTTP.h" -#include "../TestBase.h" -#include "utils/StringUtils.h" -#include "core/Core.h" -#include "../include/core/logging/Logger.h" -#include "core/ProcessGroup.h" -#include "core/yaml/YamlConfiguration.h" -#include "FlowController.h" -#include "properties/Configure.h" -#include "../unit/ProvenanceTestHelper.h" -#include "io/StreamFactory.h" -#include "c2/C2Agent.h" -#include "CivetServer.h" -#include <cstring> -#include "RESTSender.h" - -void waitToVerifyProcessor() { - std::this_thread::sleep_for(std::chrono::seconds(10)); -} - -static std::vector<std::string> responses; - -class ConfigHandler : public CivetHandler { - public: - ConfigHandler() { - calls_ = 0; - } - bool handlePost(CivetServer *server, struct mg_connection *conn) { - calls_++; - if (responses.size() > 0) { - std::string top_str = responses.back(); - responses.pop_back(); - mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " - "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", - top_str.length()); - mg_printf(conn, "%s", top_str.c_str()); - } else { - mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"); - } - - return true; - } - - bool handleGet(CivetServer *server, struct mg_connection *conn) { - std::ifstream myfile(test_file_location_.c_str()); - - if (myfile.is_open()) { - std::stringstream buffer; - buffer << myfile.rdbuf(); - std::string str = buffer.str(); - myfile.close(); - mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " - "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", - str.length()); - mg_printf(conn, "%s", str.c_str()); - } else { - mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"); - } - - return true; - } - std::string test_file_location_; - std::atomic<size_t> calls_; -}; - -int main(int argc, char **argv) { - mg_init_library(0); - LogTestController::getInstance().setInfo<minifi::FlowController>(); - LogTestController::getInstance().setDebug<minifi::utils::HTTPClient>(); - LogTestController::getInstance().setDebug<minifi::c2::RESTSender>(); - LogTestController::getInstance().setDebug<minifi::c2::C2Agent>(); - - const char *options[] = { "document_root", ".", "listening_ports", "9090", 0 }; - std::vector<std::string> cpp_options; - for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { - cpp_options.push_back(options[i]); - } - - CivetServer server(cpp_options); - ConfigHandler h_ex; - server.addHandler("/update", h_ex); - std::string key_dir, test_file_location; - if (argc > 1) { - h_ex.test_file_location_ = test_file_location = argv[1]; - key_dir = argv[2]; - } - std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {" - "\"operation\" : \"update\", " - "\"operationid\" : \"8675309\", " - "\"name\": \"configuration\"" - "}]}"; - - responses.push_back(heartbeat_response); - - std::ifstream myfile(test_file_location.c_str()); - - if (myfile.is_open()) { - std::stringstream buffer; - buffer << myfile.rdbuf(); - std::string str = buffer.str(); - myfile.close(); - std::string response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {" - "\"operation\" : \"update\", " - "\"operationid\" : \"8675309\", " - "\"name\": \"configuration\", \"content\": { \"location\": \"http://localhost:9090/update\"}}]}"; - responses.push_back(response); - } - - std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); - - configuration->set("c2.rest.url", "http://localhost:9090/update"); - configuration->set("c2.agent.heartbeat.period", "1000"); - mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); - - std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>(); - std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>(); - - configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location); - - std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration); - std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>( - new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location)); - std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); - - std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME, - true); - - core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location); - - std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location); - std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get()); - ptr.release(); - auto start = std::chrono::system_clock::now(); - - controller->load(); - controller->start(); - waitToVerifyProcessor(); - - controller->waitUnload(60000); - auto then = std::chrono::system_clock::now(); - - auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(then - start).count(); - std::string logs = LogTestController::getInstance().log_output.str(); - assert(logs.find("Starting to reload Flow Controller with flow control name MiNiFi Flow, version 0") != std::string::npos); - LogTestController::getInstance().reset(); - rmdir("./content_repository"); - assert(h_ex.calls_ <= (milliseconds / 1000) + 1); - - return 0; -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/test/curl-tests/C2VerifyHeartbeatAndStop.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/curl-tests/C2VerifyHeartbeatAndStop.cpp b/libminifi/test/curl-tests/C2VerifyHeartbeatAndStop.cpp deleted file mode 100644 index 69c9a3b..0000000 --- a/libminifi/test/curl-tests/C2VerifyHeartbeatAndStop.cpp +++ /dev/null @@ -1,155 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <sys/stat.h> -#undef NDEBUG -#include <cassert> -#include <utility> -#include <chrono> -#include <fstream> -#include <memory> -#include <string> -#include <thread> -#include <type_traits> -#include <vector> -#include <iostream> -#include <sstream> -#include "HTTPClient.h" -#include "InvokeHTTP.h" -#include "../TestBase.h" -#include "utils/StringUtils.h" -#include "core/Core.h" -#include "../include/core/logging/Logger.h" -#include "core/ProcessGroup.h" -#include "core/yaml/YamlConfiguration.h" -#include "FlowController.h" -#include "properties/Configure.h" -#include "../unit/ProvenanceTestHelper.h" -#include "io/StreamFactory.h" -#include "CivetServer.h" -#include "RemoteProcessorGroupPort.h" -#include "core/ConfigurableComponent.h" -#include "controllers/SSLContextService.h" -#include "../TestServer.h" -#include "c2/C2Agent.h" -#include "RESTReceiver.h" -#include "../integration/IntegrationBase.h" -#include "processors/LogAttribute.h" - -class Responder : public CivetHandler { - public: - explicit Responder(bool isSecure) - : isSecure(isSecure) { - } - bool handlePost(CivetServer *server, struct mg_connection *conn) { - std::string resp = - "{\"operation\" : \"heartbeat\", \"requested_operations\" : [{ \"operationid\" : 41, \"operation\" : \"stop\", \"name\" : \"invoke\" }, " - "{ \"operationid\" : 42, \"operation\" : \"stop\", \"name\" : \"FlowController\" } ]}"; - mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " - "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", - resp.length()); - mg_printf(conn, "%s", resp.c_str()); - return true; - } - - protected: - bool isSecure; -}; - -class VerifyC2Heartbeat : public IntegrationBase { - public: - explicit VerifyC2Heartbeat(bool isSecure) - : isSecure(isSecure) { - char format[] = "/tmp/ssth.XXXXXX"; - dir = testController.createTempDirectory(format); - } - - void testSetup() { - LogTestController::getInstance().setDebug<utils::HTTPClient>(); - LogTestController::getInstance().setTrace<minifi::c2::C2Agent>(); - LogTestController::getInstance().setDebug<LogTestController>(); - LogTestController::getInstance().setDebug<minifi::c2::RESTSender>(); - LogTestController::getInstance().setDebug<minifi::c2::RESTProtocol>(); - LogTestController::getInstance().setDebug<minifi::c2::RESTReceiver>(); - std::fstream file; - ss << dir << "/" << "tstFile.ext"; - file.open(ss.str(), std::ios::out); - file << "tempFile"; - file.close(); - } - - void cleanup() { - LogTestController::getInstance().reset(); - unlink(ss.str().c_str()); - } - - void runAssertions() { - assert(LogTestController::getInstance().contains("Received Ack from Server") == true); - - assert(LogTestController::getInstance().contains("C2Agent] [debug] Stopping component invoke") == true); - - assert(LogTestController::getInstance().contains("C2Agent] [debug] Stopping component FlowController") == true); - } - - void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) { - std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke"); - assert(proc != nullptr); - - std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc); - - assert(inv != nullptr); - std::string url = ""; - inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url); - - configuration->set("c2.rest.url", "http://localhost:8888/api/heartbeat"); - configuration->set("c2.agent.heartbeat.period", "1000"); - configuration->set("c2.rest.url.ack", "http://localhost:8888/api/heartbeat"); - } - - protected: - bool isSecure; - char *dir; - std::stringstream ss; - TestController testController; -}; - -int main(int argc, char **argv) { - std::string key_dir, test_file_location, url; - url = "http://localhost:8888/api/heartbeat"; - if (argc > 1) { - test_file_location = argv[1]; - key_dir = argv[2]; - } - - bool isSecure = false; - if (url.find("https") != std::string::npos) { - isSecure = true; - } - - VerifyC2Heartbeat harness(isSecure); - - harness.setKeyDir(key_dir); - - Responder responder(isSecure); - - harness.setUrl(url, &responder); - - harness.run(test_file_location); - - return 0; -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/test/curl-tests/C2VerifyServeResults.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/curl-tests/C2VerifyServeResults.cpp b/libminifi/test/curl-tests/C2VerifyServeResults.cpp deleted file mode 100644 index 961fec0..0000000 --- a/libminifi/test/curl-tests/C2VerifyServeResults.cpp +++ /dev/null @@ -1,131 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <sys/stat.h> -#undef NDEBUG -#include <cassert> -#include <utility> -#include <chrono> -#include <fstream> -#include <memory> -#include <string> -#include <thread> -#include <type_traits> -#include <vector> -#include <iostream> -#include <sstream> -#include "HTTPClient.h" -#include "InvokeHTTP.h" -#include "../TestBase.h" -#include "utils/StringUtils.h" -#include "core/Core.h" -#include "../include/core/logging/Logger.h" -#include "core/ProcessGroup.h" -#include "core/yaml/YamlConfiguration.h" -#include "FlowController.h" -#include "properties/Configure.h" -#include "../unit/ProvenanceTestHelper.h" -#include "io/StreamFactory.h" -#include "CivetServer.h" -#include "RemoteProcessorGroupPort.h" -#include "core/ConfigurableComponent.h" -#include "controllers/SSLContextService.h" -#include "../TestServer.h" -#include "c2/C2Agent.h" -#include "RESTReceiver.h" -#include "../integration/IntegrationBase.h" -#include "processors/LogAttribute.h" - -class VerifyC2Server : public IntegrationBase { - public: - explicit VerifyC2Server(bool isSecure) - : isSecure(isSecure) { - char format[] = "/tmp/ssth.XXXXXX"; - dir = testController.createTempDirectory(format); - } - - void testSetup() { - LogTestController::getInstance().setDebug<utils::HTTPClient>(); - LogTestController::getInstance().setDebug<processors::InvokeHTTP>(); - LogTestController::getInstance().setDebug<minifi::c2::RESTReceiver>(); - LogTestController::getInstance().setDebug<minifi::c2::C2Agent>(); - LogTestController::getInstance().setDebug<processors::LogAttribute>(); - LogTestController::getInstance().setDebug<minifi::core::ProcessSession>(); - std::fstream file; - ss << dir << "/" << "tstFile.ext"; - file.open(ss.str(), std::ios::out); - file << "tempFile"; - file.close(); - } - - void cleanup() { - unlink(ss.str().c_str()); - } - - void runAssertions() { - assert(LogTestController::getInstance().contains("Import offset 0") == true); - - assert(LogTestController::getInstance().contains("Outputting success and response") == true); - } - - void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) { - std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke"); - assert(proc != nullptr); - - std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc); - - assert(inv != nullptr); - std::string url = ""; - inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url); - - - std::string port, scheme, path; - parse_http_components(url, port, scheme, path); - configuration->set("c2.agent.heartbeat.reporter.classes", "RESTReceiver"); - configuration->set("c2.rest.listener.port", port); - configuration->set("c2.agent.heartbeat.period", "10"); - configuration->set("c2.rest.listener.heartbeat.rooturi", path); - } - - protected: - bool isSecure; - char *dir; - std::stringstream ss; - TestController testController; -}; - -int main(int argc, char **argv) { - std::string key_dir, test_file_location, url; - if (argc > 1) { - test_file_location = argv[1]; - key_dir = argv[2]; - } - - bool isSecure = false; - if (url.find("https") != std::string::npos) { - isSecure = true; - } - - VerifyC2Server harness(isSecure); - - harness.setKeyDir(key_dir); - - harness.run(test_file_location); - - return 0; -}
