http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/RESTSender.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/RESTSender.cpp b/extensions/http-curl/RESTSender.cpp deleted file mode 100644 index 839c70b..0000000 --- a/extensions/http-curl/RESTSender.cpp +++ /dev/null @@ -1,140 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "c2/protocols/RESTSender.h" - -#include <algorithm> -#include <memory> -#include <utility> -#include <map> -#include <string> -#include <vector> - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace c2 { - -RESTSender::RESTSender(std::string name, uuid_t uuid) - : C2Protocol(name, uuid), - logger_(logging::LoggerFactory<Connectable>::getLogger()) { -} - -void RESTSender::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) { - C2Protocol::initialize(controller, configure); - // base URL when one is not specified. - if (nullptr != configure) { - configure->get("c2.rest.url", rest_uri_); - configure->get("c2.rest.url.ack", ack_uri_); - } - logger_->log_info("Submitting to %s", rest_uri_); -} -C2Payload RESTSender::consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async) { - std::string operation_request_str = getOperation(payload); - std::string outputConfig; - if (direction == Direction::TRANSMIT) { - Json::Value json_payload; - json_payload["operation"] = operation_request_str; - if (payload.getIdentifier().length() > 0) { - json_payload["operationid"] = payload.getIdentifier(); - } - const std::vector<C2ContentResponse> &content = payload.getContent(); - - for (const auto &payload_content : content) { - Json::Value payload_content_values; - bool use_sub_option = true; - if (payload_content.op == payload.getOperation()) { - for (auto content : payload_content.operation_arguments) { - if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) { - json_payload[payload_content.name] = content.second; - use_sub_option = false; - } else { - payload_content_values[content.first] = content.second; - } - } - } - if (use_sub_option) - json_payload[payload_content.name] = payload_content_values; - } - - for (const auto &nested_payload : payload.getNestedPayloads()) { - json_payload[nested_payload.getLabel()] = serializeJsonPayload(json_payload, nested_payload); - } - - Json::StyledWriter writer; - outputConfig = writer.write(json_payload); - } - - return sendPayload(url, direction, payload, outputConfig); -} - -C2Payload RESTSender::consumePayload(const C2Payload &payload, Direction direction, bool async) { - if (payload.getOperation() == ACKNOWLEDGE) { - return consumePayload(ack_uri_, payload, direction, async); - } - return consumePayload(rest_uri_, payload, direction, async); -} - -void RESTSender::update(const std::shared_ptr<Configure> &configure) { - std::string url; - configure->get("c2.rest.url", url); - configure->get("c2.rest.url.ack", url); -} - -const C2Payload RESTSender::sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig) { - utils::HTTPClient client(url, ssl_context_service_); - client.setConnectionTimeout(2); - - std::unique_ptr<utils::ByteInputCallBack> input = nullptr; - std::unique_ptr<utils::HTTPUploadCallback> callback = nullptr; - if (direction == Direction::TRANSMIT) { - input = std::unique_ptr<utils::ByteInputCallBack>(new utils::ByteInputCallBack()); - callback = std::unique_ptr<utils::HTTPUploadCallback>(new utils::HTTPUploadCallback); - input->write(outputConfig); - callback->ptr = input.get(); - callback->pos = 0; - client.set_request_method("POST"); - client.setUploadCallback(callback.get()); - } else { - // we do not need to set the uplaod callback - // since we are not uploading anything on a get - client.set_request_method("GET"); - } - client.setContentType("application/json"); - bool isOkay = client.submit(); - int64_t respCode = client.getResponseCode(); - - if (isOkay && respCode) { - if (payload.isRaw()) { - C2Payload response_payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true, true); - - response_payload.setRawData(client.getResponseBody()); - return response_payload; - } - return parseJsonResponse(payload, client.getResponseBody()); - } else { - return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true); - } -} - -} /* namespace c2 */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/c2/protocols/RESTSender.h ---------------------------------------------------------------------- diff --git a/extensions/http-curl/c2/protocols/RESTSender.h b/extensions/http-curl/c2/protocols/RESTSender.h deleted file mode 100644 index 450799c..0000000 --- a/extensions/http-curl/c2/protocols/RESTSender.h +++ /dev/null @@ -1,80 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIBMINIFI_INCLUDE_C2_RESTSENDER_H_ -#define LIBMINIFI_INCLUDE_C2_RESTSENDER_H_ - -#include "json/json.h" -#include "json/writer.h" -#include <string> -#include <mutex> - -#include "utils/ByteArrayCallback.h" -#include "c2/C2Protocol.h" -#include "c2/protocols/RESTProtocol.h" -#include "c2/HeartBeatReporter.h" -#include "controllers/SSLContextService.h" -#include "../client/HTTPClient.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace c2 { - -/** - * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol. - * - * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST - * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction - * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively. - * - */ -class RESTSender : public RESTProtocol, public C2Protocol { - public: - - explicit RESTSender(std::string name, uuid_t uuid = nullptr); - - virtual C2Payload consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async) override; - - virtual C2Payload consumePayload(const C2Payload &payload, Direction direction, bool async) override; - - virtual void update(const std::shared_ptr<Configure> &configure) override; - - virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) override; - - protected: - - virtual const C2Payload sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig); - - std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_; - - private: - std::shared_ptr<logging::Logger> logger_; - std::string rest_uri_; - std::string ack_uri_; -}; - -REGISTER_RESOURCE(RESTSender); - -} /* namesapce c2 */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ - -#endif /* LIBMINIFI_INCLUDE_C2_RESTPROTOCOL_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/client/HTTPClient.h ---------------------------------------------------------------------- diff --git a/extensions/http-curl/client/HTTPClient.h b/extensions/http-curl/client/HTTPClient.h index 85000d9..ace479c 100644 --- a/extensions/http-curl/client/HTTPClient.h +++ b/extensions/http-curl/client/HTTPClient.h @@ -198,10 +198,6 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable { CURLcode res; - - - - CURL *http_session_; std::string method_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/protocols/RESTReceiver.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/protocols/RESTReceiver.cpp b/extensions/http-curl/protocols/RESTReceiver.cpp new file mode 100644 index 0000000..4c46516 --- /dev/null +++ b/extensions/http-curl/protocols/RESTReceiver.cpp @@ -0,0 +1,147 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "RESTReceiver.h" +#include <algorithm> +#include <memory> +#include <utility> +#include <map> +#include <string> +#include <vector> + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +int log_message(const struct mg_connection *conn, const char *message) { + puts(message); + return 1; +} + +int ssl_protocol_en(void *ssl_context, void *user_data) { + return 0; +} + +RESTReceiver::RESTReceiver(std::string name, uuid_t uuid) + : HeartBeatReporter(name, uuid), + logger_(logging::LoggerFactory<RESTReceiver>::getLogger()) { +} + +void RESTReceiver::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) { + HeartBeatReporter::initialize(controller, configure); + logger_->log_debug("Initializing rest receiveer"); + if (nullptr != configuration_) { + std::string listeningPort, rootUri, caCert; + configuration_->get("c2.rest.listener.port", listeningPort); + configuration_->get("c2.rest.listener.heartbeat.rooturi", rootUri); + configuration_->get("c2.rest.listener.cacert", caCert); + + if (!listeningPort.empty() && !rootUri.empty()) { + handler = std::unique_ptr<ListeningProtocol>(new ListeningProtocol()); + if (!caCert.empty()) { + listener = start_webserver(listeningPort, rootUri, dynamic_cast<CivetHandler*>(handler.get()), caCert); + } else { + listener = start_webserver(listeningPort, rootUri, dynamic_cast<CivetHandler*>(handler.get())); + } + } + } +} +int16_t RESTReceiver::heartbeat(const C2Payload &payload) { + std::string operation_request_str = getOperation(payload); + std::string outputConfig; + Json::Value json_payload; + json_payload["operation"] = operation_request_str; + if (payload.getIdentifier().length() > 0) { + json_payload["operationid"] = payload.getIdentifier(); + } + const std::vector<C2ContentResponse> &content = payload.getContent(); + + for (const auto &payload_content : content) { + Json::Value payload_content_values; + bool use_sub_option = true; + if (payload_content.op == payload.getOperation()) { + for (auto content : payload_content.operation_arguments) { + if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) { + json_payload[payload_content.name] = content.second; + use_sub_option = false; + } else { + payload_content_values[content.first] = content.second; + } + } + } + if (use_sub_option) + json_payload[payload_content.name] = payload_content_values; + } + + for (const auto &nested_payload : payload.getNestedPayloads()) { + json_payload[nested_payload.getLabel()] = serializeJsonPayload(json_payload, nested_payload); + } + + Json::StyledWriter writer; + outputConfig = writer.write(json_payload); + if (handler != nullptr) { + logger_->log_debug("Setting %s", outputConfig); + handler->setResponse(outputConfig); + } + + return 0; +} + +std::unique_ptr<CivetServer> RESTReceiver::start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler, std::string &ca_cert) { + struct mg_callbacks callback; + + memset(&callback, 0, sizeof(callback)); + callback.init_ssl = ssl_protocol_en; + std::string my_port = port; + my_port += "s"; + callback.log_message = log_message; + const char *options[] = { "listening_ports", port.c_str(), "ssl_certificate", ca_cert.c_str(), "ssl_protocol_version", "0", "ssl_cipher_list", "ALL", + "ssl_verify_peer", "no", "num_threads", "1", 0 }; + + std::vector<std::string> cpp_options; + for (uint32_t i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { + cpp_options.push_back(options[i]); + } + std::unique_ptr<CivetServer> server = std::unique_ptr<CivetServer>(new CivetServer(cpp_options)); + + server->addHandler(rooturi, handler); + + return server; +} + +std::unique_ptr<CivetServer> RESTReceiver::start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler) { + const char *options[] = { "document_root", ".", "listening_ports", port.c_str(), "num_threads", "1", 0 }; + + std::vector<std::string> cpp_options; + for (uint32_t i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { + cpp_options.push_back(options[i]); + } + std::unique_ptr<CivetServer> server = std::unique_ptr<CivetServer>(new CivetServer(cpp_options)); + + server->addHandler(rooturi, handler); + + return server; +} + +} /* namespace c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/protocols/RESTReceiver.h ---------------------------------------------------------------------- diff --git a/extensions/http-curl/protocols/RESTReceiver.h b/extensions/http-curl/protocols/RESTReceiver.h new file mode 100644 index 0000000..4793ee3 --- /dev/null +++ b/extensions/http-curl/protocols/RESTReceiver.h @@ -0,0 +1,110 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_C2_RESTRCVR_H_ +#define LIBMINIFI_INCLUDE_C2_RESTRCVR_H_ + +#include "json/json.h" +#include "json/writer.h" +#include <string> +#include <mutex> +#include "core/Resource.h" +#include "c2/protocols/RESTProtocol.h" +#include "CivetServer.h" +#include "c2/C2Protocol.h" +#include "controllers/SSLContextService.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +int log_message(const struct mg_connection *conn, const char *message); + +int ssl_protocol_en(void *ssl_context, void *user_data); + +/** + * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol. + * + * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST + * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction + * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively. + * + */ +class RESTReceiver : public RESTProtocol, public HeartBeatReporter { + public: + RESTReceiver(std::string name, uuid_t uuid = nullptr); + + virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) override; + virtual int16_t heartbeat(const C2Payload &heartbeat) override; + + protected: + + class ListeningProtocol : public CivetHandler { + + public: + ListeningProtocol() { + + } + + bool handleGet(CivetServer *server, struct mg_connection *conn) { + std::string currentvalue; + { + std::lock_guard<std::mutex> lock(reponse_mutex_); + currentvalue = resp_; + } + + std::stringstream output; + output << "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: " << currentvalue.length() << "\r\nConnection: close\r\n\r\n"; + + mg_printf(conn, "%s", output.str().c_str()); + mg_printf(conn, "%s", currentvalue.c_str()); + return true; + } + + void setResponse(std::string response) { + std::lock_guard<std::mutex> lock(reponse_mutex_); + resp_ = response; + } + + protected: + std::mutex reponse_mutex_; + std::string resp_; + + }; + + std::unique_ptr<CivetServer> start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler, std::string &ca_cert); + + std::unique_ptr<CivetServer> start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler); + + std::unique_ptr<CivetServer> listener; + std::unique_ptr<ListeningProtocol> handler; + + private: + std::shared_ptr<logging::Logger> logger_; +}; + +REGISTER_RESOURCE(RESTReceiver); + +} /* namesapce c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_C2_RESTRCVR_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/protocols/RESTSender.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp new file mode 100644 index 0000000..ebf532a --- /dev/null +++ b/extensions/http-curl/protocols/RESTSender.cpp @@ -0,0 +1,140 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "RESTSender.h" + +#include <algorithm> +#include <memory> +#include <utility> +#include <map> +#include <string> +#include <vector> + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +RESTSender::RESTSender(std::string name, uuid_t uuid) + : C2Protocol(name, uuid), + logger_(logging::LoggerFactory<Connectable>::getLogger()) { +} + +void RESTSender::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) { + C2Protocol::initialize(controller, configure); + // base URL when one is not specified. + if (nullptr != configure) { + configure->get("c2.rest.url", rest_uri_); + configure->get("c2.rest.url.ack", ack_uri_); + } + logger_->log_info("Submitting to %s", rest_uri_); +} +C2Payload RESTSender::consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async) { + std::string operation_request_str = getOperation(payload); + std::string outputConfig; + if (direction == Direction::TRANSMIT) { + Json::Value json_payload; + json_payload["operation"] = operation_request_str; + if (payload.getIdentifier().length() > 0) { + json_payload["operationid"] = payload.getIdentifier(); + } + const std::vector<C2ContentResponse> &content = payload.getContent(); + + for (const auto &payload_content : content) { + Json::Value payload_content_values; + bool use_sub_option = true; + if (payload_content.op == payload.getOperation()) { + for (auto content : payload_content.operation_arguments) { + if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) { + json_payload[payload_content.name] = content.second; + use_sub_option = false; + } else { + payload_content_values[content.first] = content.second; + } + } + } + if (use_sub_option) + json_payload[payload_content.name] = payload_content_values; + } + + for (const auto &nested_payload : payload.getNestedPayloads()) { + json_payload[nested_payload.getLabel()] = serializeJsonPayload(json_payload, nested_payload); + } + + Json::StyledWriter writer; + outputConfig = writer.write(json_payload); + } + + return sendPayload(url, direction, payload, outputConfig); +} + +C2Payload RESTSender::consumePayload(const C2Payload &payload, Direction direction, bool async) { + if (payload.getOperation() == ACKNOWLEDGE) { + return consumePayload(ack_uri_, payload, direction, async); + } + return consumePayload(rest_uri_, payload, direction, async); +} + +void RESTSender::update(const std::shared_ptr<Configure> &configure) { + std::string url; + configure->get("c2.rest.url", url); + configure->get("c2.rest.url.ack", url); +} + +const C2Payload RESTSender::sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig) { + utils::HTTPClient client(url, ssl_context_service_); + client.setConnectionTimeout(2); + + std::unique_ptr<utils::ByteInputCallBack> input = nullptr; + std::unique_ptr<utils::HTTPUploadCallback> callback = nullptr; + if (direction == Direction::TRANSMIT) { + input = std::unique_ptr<utils::ByteInputCallBack>(new utils::ByteInputCallBack()); + callback = std::unique_ptr<utils::HTTPUploadCallback>(new utils::HTTPUploadCallback); + input->write(outputConfig); + callback->ptr = input.get(); + callback->pos = 0; + client.set_request_method("POST"); + client.setUploadCallback(callback.get()); + } else { + // we do not need to set the uplaod callback + // since we are not uploading anything on a get + client.set_request_method("GET"); + } + client.setContentType("application/json"); + bool isOkay = client.submit(); + int64_t respCode = client.getResponseCode(); + + if (isOkay && respCode) { + if (payload.isRaw()) { + C2Payload response_payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true, true); + + response_payload.setRawData(client.getResponseBody()); + return response_payload; + } + return parseJsonResponse(payload, client.getResponseBody()); + } else { + return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true); + } +} + +} /* namespace c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/protocols/RESTSender.h ---------------------------------------------------------------------- diff --git a/extensions/http-curl/protocols/RESTSender.h b/extensions/http-curl/protocols/RESTSender.h new file mode 100644 index 0000000..450799c --- /dev/null +++ b/extensions/http-curl/protocols/RESTSender.h @@ -0,0 +1,80 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_C2_RESTSENDER_H_ +#define LIBMINIFI_INCLUDE_C2_RESTSENDER_H_ + +#include "json/json.h" +#include "json/writer.h" +#include <string> +#include <mutex> + +#include "utils/ByteArrayCallback.h" +#include "c2/C2Protocol.h" +#include "c2/protocols/RESTProtocol.h" +#include "c2/HeartBeatReporter.h" +#include "controllers/SSLContextService.h" +#include "../client/HTTPClient.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +/** + * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol. + * + * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST + * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction + * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively. + * + */ +class RESTSender : public RESTProtocol, public C2Protocol { + public: + + explicit RESTSender(std::string name, uuid_t uuid = nullptr); + + virtual C2Payload consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async) override; + + virtual C2Payload consumePayload(const C2Payload &payload, Direction direction, bool async) override; + + virtual void update(const std::shared_ptr<Configure> &configure) override; + + virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) override; + + protected: + + virtual const C2Payload sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig); + + std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_; + + private: + std::shared_ptr<logging::Logger> logger_; + std::string rest_uri_; + std::string ack_uri_; +}; + +REGISTER_RESOURCE(RESTSender); + +} /* namesapce c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_C2_RESTPROTOCOL_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/C2NullConfiguration.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/tests/C2NullConfiguration.cpp b/extensions/http-curl/tests/C2NullConfiguration.cpp new file mode 100644 index 0000000..934cf02 --- /dev/null +++ b/extensions/http-curl/tests/C2NullConfiguration.cpp @@ -0,0 +1,136 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <sys/stat.h> +#undef NDEBUG +#include <cassert> +#include <utility> +#include <chrono> +#include <fstream> +#include <memory> +#include <string> +#include <thread> +#include <type_traits> +#include <vector> +#include <iostream> +#include <sstream> +#include "HTTPClient.h" +#include "InvokeHTTP.h" +#include "TestBase.h" +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "unit/ProvenanceTestHelper.h" +#include "io/StreamFactory.h" +#include "CivetServer.h" +#include "RemoteProcessorGroupPort.h" +#include "core/ConfigurableComponent.h" +#include "controllers/SSLContextService.h" +#include "TestServer.h" +#include "protocols/RESTReceiver.h" +#include "protocols/RESTSender.h" +#include "c2/C2Agent.h" +#include "processors/LogAttribute.h" +#include "HTTPIntegrationBase.h" + +class VerifyC2Server : public HTTPIntegrationBase { + public: + explicit VerifyC2Server(bool isSecure) + : isSecure(isSecure) { + char format[] = "/tmp/ssth.XXXXXX"; + dir = testController.createTempDirectory(format); + } + + void testSetup() { + LogTestController::getInstance().setDebug<utils::HTTPClient>(); + LogTestController::getInstance().setDebug<processors::InvokeHTTP>(); + LogTestController::getInstance().setDebug<minifi::c2::RESTReceiver>(); + LogTestController::getInstance().setDebug<minifi::c2::C2Agent>(); + LogTestController::getInstance().setDebug<processors::LogAttribute>(); + LogTestController::getInstance().setDebug<minifi::core::ProcessSession>(); + std::fstream file; + ss << dir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << "tempFile"; + file.close(); + } + + void cleanup() { + unlink(ss.str().c_str()); + } + + void runAssertions() { + assert(LogTestController::getInstance().contains("C2Agent] [info] Class is null") == true); + assert(LogTestController::getInstance().contains("C2Agent] [debug] Could not instantiate null") == true); + assert(LogTestController::getInstance().contains("Class is RESTSender") == true); + } + + void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) { + std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke"); + assert(proc != nullptr); + + std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc); + + assert(inv != nullptr); + std::string url = ""; + inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url); + + + std::string port, scheme, path; + parse_http_components(url, port, scheme, path); + configuration->set("c2.agent.protocol.class", "null"); + configuration->set("c2.rest.url", ""); + configuration->set("c2.rest.url.ack", ""); + configuration->set("c2.agent.heartbeat.reporter.classes", "null"); + configuration->set("c2.rest.listener.port", "null"); + configuration->set("c2.agent.heartbeat.period", "null"); + configuration->set("c2.rest.listener.heartbeat.rooturi", "null"); + } + + protected: + bool isSecure; + char *dir; + std::stringstream ss; + TestController testController; +}; + +int main(int argc, char **argv) { + std::string key_dir, test_file_location, url; + if (argc > 1) { + test_file_location = argv[1]; + key_dir = argv[2]; + } + + bool isSecure = false; + if (url.find("https") != std::string::npos) { + isSecure = true; + } + + VerifyC2Server harness(isSecure); + + harness.setKeyDir(key_dir); + + harness.run(test_file_location); + + return 0; +} + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/C2UpdateTest.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/tests/C2UpdateTest.cpp b/extensions/http-curl/tests/C2UpdateTest.cpp new file mode 100644 index 0000000..f21084b --- /dev/null +++ b/extensions/http-curl/tests/C2UpdateTest.cpp @@ -0,0 +1,183 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <sys/stat.h> +#undef NDEBUG +#include <cassert> +#include <utility> +#include <chrono> +#include <fstream> +#include <memory> +#include <string> +#include <thread> +#include <type_traits> +#include <vector> +#include <iostream> +#include <sstream> +#include "HTTPClient.h" +#include "InvokeHTTP.h" +#include "TestBase.h" +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "unit/ProvenanceTestHelper.h" +#include "io/StreamFactory.h" +#include "c2/C2Agent.h" +#include "CivetServer.h" +#include <cstring> +#include "protocols/RESTSender.h" + +void waitToVerifyProcessor() { + std::this_thread::sleep_for(std::chrono::seconds(10)); +} + +static std::vector<std::string> responses; + +class ConfigHandler : public CivetHandler { + public: + ConfigHandler() { + calls_ = 0; + } + bool handlePost(CivetServer *server, struct mg_connection *conn) { + calls_++; + if (responses.size() > 0) { + std::string top_str = responses.back(); + responses.pop_back(); + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + top_str.length()); + mg_printf(conn, "%s", top_str.c_str()); + } else { + mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"); + } + + return true; + } + + bool handleGet(CivetServer *server, struct mg_connection *conn) { + std::ifstream myfile(test_file_location_.c_str()); + + if (myfile.is_open()) { + std::stringstream buffer; + buffer << myfile.rdbuf(); + std::string str = buffer.str(); + myfile.close(); + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + str.length()); + mg_printf(conn, "%s", str.c_str()); + } else { + mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"); + } + + return true; + } + std::string test_file_location_; + std::atomic<size_t> calls_; +}; + +int main(int argc, char **argv) { + mg_init_library(0); + LogTestController::getInstance().setInfo<minifi::FlowController>(); + LogTestController::getInstance().setDebug<minifi::utils::HTTPClient>(); + LogTestController::getInstance().setDebug<minifi::c2::RESTSender>(); + LogTestController::getInstance().setDebug<minifi::c2::C2Agent>(); + + const char *options[] = { "document_root", ".", "listening_ports", "9090", 0 }; + std::vector<std::string> cpp_options; + for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { + cpp_options.push_back(options[i]); + } + + CivetServer server(cpp_options); + ConfigHandler h_ex; + server.addHandler("/update", h_ex); + std::string key_dir, test_file_location; + if (argc > 1) { + h_ex.test_file_location_ = test_file_location = argv[1]; + key_dir = argv[2]; + } + std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {" + "\"operation\" : \"update\", " + "\"operationid\" : \"8675309\", " + "\"name\": \"configuration\"" + "}]}"; + + responses.push_back(heartbeat_response); + + std::ifstream myfile(test_file_location.c_str()); + + if (myfile.is_open()) { + std::stringstream buffer; + buffer << myfile.rdbuf(); + std::string str = buffer.str(); + myfile.close(); + std::string response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {" + "\"operation\" : \"update\", " + "\"operationid\" : \"8675309\", " + "\"name\": \"configuration\", \"content\": { \"location\": \"http://localhost:9090/update\"}}]}"; + responses.push_back(response); + } + + std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); + + configuration->set("c2.rest.url", "http://localhost:9090/update"); + configuration->set("c2.agent.heartbeat.period", "1000"); + mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); + + std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>(); + std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>(); + + configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location); + + std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration); + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>( + new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location)); + std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); + + std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME, + true); + + core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location); + + std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location); + std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get()); + ptr.release(); + auto start = std::chrono::system_clock::now(); + + controller->load(); + controller->start(); + waitToVerifyProcessor(); + + controller->waitUnload(60000); + auto then = std::chrono::system_clock::now(); + + auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(then - start).count(); + std::string logs = LogTestController::getInstance().log_output.str(); + assert(logs.find("Starting to reload Flow Controller with flow control name MiNiFi Flow, version 0") != std::string::npos); + LogTestController::getInstance().reset(); + rmdir("./content_repository"); + assert(h_ex.calls_ <= (milliseconds / 1000) + 1); + + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp b/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp new file mode 100644 index 0000000..adb2db1 --- /dev/null +++ b/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp @@ -0,0 +1,156 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <sys/stat.h> +#undef NDEBUG +#include <cassert> +#include <utility> +#include <chrono> +#include <fstream> +#include <memory> +#include <string> +#include <thread> +#include <type_traits> +#include <vector> +#include <iostream> +#include <sstream> +#include "HTTPClient.h" +#include "InvokeHTTP.h" +#include "TestBase.h" +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "unit/ProvenanceTestHelper.h" +#include "io/StreamFactory.h" +#include "CivetServer.h" +#include "RemoteProcessorGroupPort.h" +#include "core/ConfigurableComponent.h" +#include "controllers/SSLContextService.h" +#include "TestServer.h" +#include "c2/C2Agent.h" +#include "protocols/RESTReceiver.h" +#include "protocols/RESTSender.h" +#include "HTTPIntegrationBase.h" +#include "processors/LogAttribute.h" + +class Responder : public CivetHandler { + public: + explicit Responder(bool isSecure) + : isSecure(isSecure) { + } + bool handlePost(CivetServer *server, struct mg_connection *conn) { + std::string resp = + "{\"operation\" : \"heartbeat\", \"requested_operations\" : [{ \"operationid\" : 41, \"operation\" : \"stop\", \"name\" : \"invoke\" }, " + "{ \"operationid\" : 42, \"operation\" : \"stop\", \"name\" : \"FlowController\" } ]}"; + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + resp.length()); + mg_printf(conn, "%s", resp.c_str()); + return true; + } + + protected: + bool isSecure; +}; + +class VerifyC2Heartbeat : public HTTPIntegrationBase { + public: + explicit VerifyC2Heartbeat(bool isSecure) + : isSecure(isSecure) { + char format[] = "/tmp/ssth.XXXXXX"; + dir = testController.createTempDirectory(format); + } + + void testSetup() { + LogTestController::getInstance().setDebug<utils::HTTPClient>(); + LogTestController::getInstance().setTrace<minifi::c2::C2Agent>(); + LogTestController::getInstance().setDebug<LogTestController>(); + LogTestController::getInstance().setDebug<minifi::c2::RESTSender>(); + LogTestController::getInstance().setDebug<minifi::c2::RESTProtocol>(); + LogTestController::getInstance().setDebug<minifi::c2::RESTReceiver>(); + std::fstream file; + ss << dir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << "tempFile"; + file.close(); + } + + void cleanup() { + LogTestController::getInstance().reset(); + unlink(ss.str().c_str()); + } + + void runAssertions() { + assert(LogTestController::getInstance().contains("Received Ack from Server") == true); + + assert(LogTestController::getInstance().contains("C2Agent] [debug] Stopping component invoke") == true); + + assert(LogTestController::getInstance().contains("C2Agent] [debug] Stopping component FlowController") == true); + } + + void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) { + std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke"); + assert(proc != nullptr); + + std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc); + + assert(inv != nullptr); + std::string url = ""; + inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url); + + configuration->set("c2.rest.url", "http://localhost:8888/api/heartbeat"); + configuration->set("c2.agent.heartbeat.period", "1000"); + configuration->set("c2.rest.url.ack", "http://localhost:8888/api/heartbeat"); + } + + protected: + bool isSecure; + char *dir; + std::stringstream ss; + TestController testController; +}; + +int main(int argc, char **argv) { + std::string key_dir, test_file_location, url; + url = "http://localhost:8888/api/heartbeat"; + if (argc > 1) { + test_file_location = argv[1]; + key_dir = argv[2]; + } + + bool isSecure = false; + if (url.find("https") != std::string::npos) { + isSecure = true; + } + + VerifyC2Heartbeat harness(isSecure); + + harness.setKeyDir(key_dir); + + Responder responder(isSecure); + + harness.setUrl(url, &responder); + + harness.run(test_file_location); + + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/C2VerifyServeResults.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/tests/C2VerifyServeResults.cpp b/extensions/http-curl/tests/C2VerifyServeResults.cpp new file mode 100644 index 0000000..fbbc8c8 --- /dev/null +++ b/extensions/http-curl/tests/C2VerifyServeResults.cpp @@ -0,0 +1,131 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <sys/stat.h> +#undef NDEBUG +#include <cassert> +#include <utility> +#include <chrono> +#include <fstream> +#include <memory> +#include <string> +#include <thread> +#include <type_traits> +#include <vector> +#include <iostream> +#include <sstream> +#include "HTTPClient.h" +#include "processors/InvokeHTTP.h" +#include "TestBase.h" +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "unit/ProvenanceTestHelper.h" +#include "io/StreamFactory.h" +#include "CivetServer.h" +#include "RemoteProcessorGroupPort.h" +#include "core/ConfigurableComponent.h" +#include "controllers/SSLContextService.h" +#include "TestServer.h" +#include "c2/C2Agent.h" +#include "protocols/RESTReceiver.h" +#include "HTTPIntegrationBase.h" +#include "processors/LogAttribute.h" + +class VerifyC2Server : public HTTPIntegrationBase { + public: + explicit VerifyC2Server(bool isSecure) + : isSecure(isSecure) { + char format[] = "/tmp/ssth.XXXXXX"; + dir = testController.createTempDirectory(format); + } + + void testSetup() { + LogTestController::getInstance().setDebug<utils::HTTPClient>(); + LogTestController::getInstance().setDebug<processors::InvokeHTTP>(); + LogTestController::getInstance().setDebug<minifi::c2::RESTReceiver>(); + LogTestController::getInstance().setDebug<minifi::c2::C2Agent>(); + LogTestController::getInstance().setDebug<processors::LogAttribute>(); + LogTestController::getInstance().setDebug<minifi::core::ProcessSession>(); + std::fstream file; + ss << dir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << "tempFile"; + file.close(); + } + + void cleanup() { + unlink(ss.str().c_str()); + } + + void runAssertions() { + assert(LogTestController::getInstance().contains("Import offset 0") == true); + + assert(LogTestController::getInstance().contains("Outputting success and response") == true); + } + + void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) { + std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke"); + assert(proc != nullptr); + + std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc); + + assert(inv != nullptr); + std::string url = ""; + inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url); + + + std::string port, scheme, path; + parse_http_components(url, port, scheme, path); + configuration->set("c2.agent.heartbeat.reporter.classes", "RESTReceiver"); + configuration->set("c2.rest.listener.port", port); + configuration->set("c2.agent.heartbeat.period", "10"); + configuration->set("c2.rest.listener.heartbeat.rooturi", path); + } + + protected: + bool isSecure; + char *dir; + std::stringstream ss; + TestController testController; +}; + +int main(int argc, char **argv) { + std::string key_dir, test_file_location, url; + if (argc > 1) { + test_file_location = argv[1]; + key_dir = argv[2]; + } + + bool isSecure = false; + if (url.find("https") != std::string::npos) { + isSecure = true; + } + + VerifyC2Server harness(isSecure); + + harness.setKeyDir(key_dir); + + harness.run(test_file_location); + + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/extensions/http-curl/tests/CMakeLists.txt b/extensions/http-curl/tests/CMakeLists.txt new file mode 100644 index 0000000..8fd89e9 --- /dev/null +++ b/extensions/http-curl/tests/CMakeLists.txt @@ -0,0 +1,82 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +file(GLOB CURL_UNIT_TESTS "unit/*.cpp") +file(GLOB CURL_INTEGRATION_TESTS "*.cpp") + +SET(CURL_INT_TEST_COUNT 0) + +FOREACH(testfile ${CURL_UNIT_TESTS}) + get_filename_component(testfilename "${testfile}" NAME_WE) + add_executable("${testfilename}" "${testfile}") + target_include_directories(${testfilename} BEFORE PRIVATE ${CURL_INCLUDE_DIRS}) + target_include_directories(${testfilename} BEFORE PRIVATE "${CIVET_THIRDPARTY_ROOT}") + target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/test/") + target_include_directories(${testfilename} BEFORE PRIVATE "../") + target_include_directories(${testfilename} BEFORE PRIVATE "../client/") + target_include_directories(${testfilename} BEFORE PRIVATE "../processors/") + target_include_directories(${testfilename} BEFORE PRIVATE "../protocols/") + target_include_directories(${testfilename} BEFORE PRIVATE "../sitetosite/") + target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/civetweb/") + target_include_directories(${testfilename} BEFORE PRIVATE ./include) + createTests("${testfilename}") + target_link_libraries(${testfilename} ${CATCH_MAIN_LIB}) + if (APPLE) + target_link_libraries ("${testfilename}" -Wl,-all_load minifi-http-curl minifi-civet-extensions) + else () + target_link_libraries ("${testfilename}" -Wl,--whole-archive minifi-http-curl minifi-civet-extensions -Wl,--no-whole-archive) + endif() + MATH(EXPR CURL_INT_TEST_COUNT "${CURL_INT_TEST_COUNT}+1") +ENDFOREACH() + +FOREACH(testfile ${CURL_INTEGRATION_TESTS}) + get_filename_component(testfilename "${testfile}" NAME_WE) + add_executable("${testfilename}" "${testfile}") + target_include_directories(${testfilename} BEFORE PRIVATE ${CURL_INCLUDE_DIRS}) + target_include_directories(${testfilename} BEFORE PRIVATE "${CIVET_THIRDPARTY_ROOT}/include") + target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/test/") + target_include_directories(${testfilename} BEFORE PRIVATE "../") + target_include_directories(${testfilename} BEFORE PRIVATE "../client/") + target_include_directories(${testfilename} BEFORE PRIVATE "../processors/") + target_include_directories(${testfilename} BEFORE PRIVATE "../protocols/") + target_include_directories(${testfilename} BEFORE PRIVATE "../sitetosite/") + target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/civetweb/") + target_include_directories(${testfilename} BEFORE PRIVATE ./include) + createTests("${testfilename}") + if (APPLE) + target_link_libraries ("${testfilename}" -Wl,-all_load minifi-http-curl minifi-civet-extensions) + else () + target_link_libraries ("${testfilename}" -Wl,--whole-archive minifi-http-curl minifi-civet-extensions -Wl,--no-whole-archive) + endif() + MATH(EXPR CURL_INT_TEST_COUNT "${CURL_INT_TEST_COUNT}+1") +ENDFOREACH() + +message("-- Finished building ${CURL_INT_TEST_COUNT} libcURL integration test file(s)...") + +add_test(NAME HttpGetIntegrationTest COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") +add_test(NAME C2UpdateTest COMMAND C2UpdateTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") +add_test(NAME C2NullConfiguration COMMAND C2NullConfiguration "${TEST_RESOURCES}/TestNull.yml" "${TEST_RESOURCES}/") +add_test(NAME HttpGetIntegrationTestSecure COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGetSecure.yml" "${TEST_RESOURCES}/") +add_test(NAME HttpPostIntegrationTest COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPost.yml" "${TEST_RESOURCES}/") +add_test(NAME HttpPostIntegrationTestChunked COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPostChunkedEncoding.yml" "${TEST_RESOURCES}/") +add_test(NAME C2VerifyServeResults COMMAND C2VerifyServeResults "${TEST_RESOURCES}/C2VerifyServeResults.yml" "${TEST_RESOURCES}/") +add_test(NAME C2VerifyHeartbeatAndStop COMMAND C2VerifyHeartbeatAndStop "${TEST_RESOURCES}/C2VerifyHeartbeatAndStop.yml" "${TEST_RESOURCES}/") +add_test(NAME SiteToSiteRestTest COMMAND SiteToSiteRestTest "${TEST_RESOURCES}/TestSite2SiteRest.yml" "${TEST_RESOURCES}/" "http://localhost:8077/nifi-api/site-to-site") +add_test(NAME ControllerServiceIntegrationTests COMMAND ControllerServiceIntegrationTests "${TEST_RESOURCES}/TestControllerServices.yml" "${TEST_RESOURCES}/") +add_test(NAME ThreadPoolAdjust COMMAND ThreadPoolAdjust "${TEST_RESOURCES}/ThreadPoolAdjust.yml" "${TEST_RESOURCES}/") http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/CivetStream.h ---------------------------------------------------------------------- diff --git a/extensions/http-curl/tests/CivetStream.h b/extensions/http-curl/tests/CivetStream.h new file mode 100644 index 0000000..571b0ca --- /dev/null +++ b/extensions/http-curl/tests/CivetStream.h @@ -0,0 +1,138 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef EXTENSIONS_HTTP_CURL_CLIENT_CIVETSTREAM_H_ +#define EXTENSIONS_HTTP_CURL_CLIENT_CIVETSTREAM_H_ + +#include <memory> +#include <thread> +#include <mutex> +#include <future> +#include <vector> + +#include "io/BaseStream.h" +#include "civetweb.h" +#include "CivetServer.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace io { + +class CivetStream : public io::BaseStream { + public: + /** + * File Stream constructor that accepts an fstream shared pointer. + * It must already be initialized for read and write. + */ + explicit CivetStream(struct mg_connection *conn) + : io::BaseStream(), conn(conn) { + + } + + virtual ~CivetStream() { + } + /** + * Skip to the specified offset. + * @param offset offset to which we will skip + */ + void seek(uint64_t offset){ + + } + + const uint64_t getSize() const { + return BaseStream::readBuffer; + } + + // data stream extensions + /** + * Reads data and places it into buf + * @param buf buffer in which we extract data + * @param buflen + */ + virtual int readData(std::vector<uint8_t> &buf, int buflen) { + if (buf.capacity() < buflen) { + buf.resize(buflen); + } + int ret = readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen); + + if (ret < buflen) { + buf.resize(ret); + } + return ret; + } + + /** + * Reads data and places it into buf + * @param buf buffer in which we extract data + * @param buflen + */ + virtual int readData(uint8_t *buf, int buflen) { + return mg_read(conn,buf,buflen); + } + + /** + * Write value to the stream using std::vector + * @param buf incoming buffer + * @param buflen buffer to write + * + */ + virtual int writeData(std::vector<uint8_t> &buf, int buflen) { + return 0; + } + + /** + * writes value to stream + * @param value value to write + * @param size size of value + */ + virtual int writeData(uint8_t *value, int size) { + return 0; + } + + protected: + + /** + * Creates a vector and returns the vector using the provided + * type name. + * @param t incoming object + * @returns vector. + */ + template<typename T> + inline std::vector<uint8_t> readBuffer(const T& t) { + std::vector<uint8_t> buf; + buf.resize(sizeof t); + readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t)); + return buf; + } + + void reset(); + + //size_t pos; + struct mg_connection *conn; + + private: + + std::shared_ptr<logging::Logger> logger_; +}; +} /* namespace io */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* EXTENSIONS_HTTP_CURL_CLIENT_CIVETSTREAM_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp new file mode 100644 index 0000000..612603a --- /dev/null +++ b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp @@ -0,0 +1,160 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#undef NDEBUG +#include <cassert> +#include <chrono> +#include <fstream> +#include <memory> +#include <string> +#include <utility> +#include <thread> +#include <type_traits> +#include <vector> + +#include "core/controller/ControllerServiceMap.h" +#include "core/controller/StandardControllerServiceNode.h" +#include "core/controller/StandardControllerServiceProvider.h" +#include "controllers/SSLContextService.h" +#include "core/Core.h" +#include "core/logging/LoggerConfiguration.h" +#include "core/ProcessGroup.h" +#include "core/Resource.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "unit/MockClasses.h" +#include "unit/ProvenanceTestHelper.h" + +REGISTER_RESOURCE(MockControllerService); +REGISTER_RESOURCE(MockProcessor); + +std::shared_ptr<core::controller::StandardControllerServiceNode> newCsNode(std::shared_ptr<core::controller::ControllerServiceProvider> provider, const std::string id) { + std::shared_ptr<core::controller::ControllerService> service = std::make_shared<MockControllerService>(); + std::shared_ptr<core::controller::StandardControllerServiceNode> testNode = std::make_shared<core::controller::StandardControllerServiceNode>(service, provider, id, + std::make_shared<minifi::Configure>()); + return testNode; +} + +void waitToVerifyProcessor() { + std::this_thread::sleep_for(std::chrono::seconds(2)); +} + +int main(int argc, char **argv) { + std::string test_file_location; + std::string key_dir; + + if (argc > 2) { + test_file_location = argv[1]; + key_dir = argv[1]; + } + + std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); + + std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>(); + std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>(); + + configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location); + std::string client_cert = "cn.crt.pem"; + std::string priv_key_file = "cn.ckey.pem"; + std::string passphrase = "cn.pass"; + std::string ca_cert = "nifi-cert.pem"; + configuration->set(minifi::Configure::nifi_security_client_certificate, test_file_location); + configuration->set(minifi::Configure::nifi_security_client_private_key, priv_key_file); + configuration->set(minifi::Configure::nifi_security_client_pass_phrase, passphrase); + configuration->set(minifi::Configure::nifi_default_directory, key_dir); + + std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration); + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + content_repo->initialize(configuration); + std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>( + new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location)); + std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); + + std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), + content_repo, + DEFAULT_ROOT_GROUP_NAME, + true); + + disabled = false; + std::shared_ptr<core::controller::ControllerServiceMap> map = std::make_shared<core::controller::ControllerServiceMap>(); + + core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location); + + std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location); + std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get()); + ptr.release(); + + std::shared_ptr<core::controller::StandardControllerServiceProvider> provider = std::make_shared<core::controller::StandardControllerServiceProvider>(map, pg, std::make_shared<minifi::Configure>()); + std::shared_ptr<core::controller::ControllerServiceNode> mockNode = pg->findControllerService("MockItLikeIts1995"); + assert(mockNode != nullptr); + mockNode->enable(); + std::vector<std::shared_ptr<core::controller::ControllerServiceNode> > linkedNodes = mockNode->getLinkedControllerServices(); + assert(linkedNodes.size() == 1); + + std::shared_ptr<core::controller::ControllerServiceNode> notexistNode = pg->findControllerService("MockItLikeItsWrong"); + assert(notexistNode == nullptr); + + std::shared_ptr<core::controller::ControllerServiceNode> ssl_client_cont = nullptr; + std::shared_ptr<minifi::controllers::SSLContextService> ssl_client = nullptr; + { + std::lock_guard<std::mutex> lock(control_mutex); + controller->load(); + controller->start(); + ssl_client_cont = controller->getControllerServiceNode("SSLClientServiceTest"); + ssl_client_cont->enable(); + assert(ssl_client_cont != nullptr); + assert(ssl_client_cont->getControllerServiceImplementation() != nullptr); + ssl_client = std::static_pointer_cast<minifi::controllers::SSLContextService>(ssl_client_cont->getControllerServiceImplementation()); + } + assert(ssl_client->getCACertificate().length() > 0); + // now let's disable one of the controller services. + std::shared_ptr<core::controller::ControllerServiceNode> cs_id = controller->getControllerServiceNode("ID"); + assert(cs_id != nullptr); + { + std::lock_guard<std::mutex> lock(control_mutex); + controller->disableControllerService(cs_id); + disabled = true; + waitToVerifyProcessor(); + } + { + std::lock_guard<std::mutex> lock(control_mutex); + controller->enableControllerService(cs_id); + disabled = false; + waitToVerifyProcessor(); + } + std::shared_ptr<core::controller::ControllerServiceNode> mock_cont = controller->getControllerServiceNode("MockItLikeIts1995"); + assert(cs_id->enabled()); +{ + std::lock_guard<std::mutex> lock(control_mutex); + controller->disableReferencingServices(mock_cont); + disabled = true; + waitToVerifyProcessor(); + } + assert(cs_id->enabled() == false); +{ + std::lock_guard<std::mutex> lock(control_mutex); + controller->enableReferencingServices(mock_cont); + disabled = false; + waitToVerifyProcessor(); + } + assert(cs_id->enabled() == true); + + controller->waitUnload(60000); + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/GetFileNoData.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/tests/GetFileNoData.cpp b/extensions/http-curl/tests/GetFileNoData.cpp new file mode 100644 index 0000000..299d994 --- /dev/null +++ b/extensions/http-curl/tests/GetFileNoData.cpp @@ -0,0 +1,184 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <sys/stat.h> +#undef NDEBUG +#include <cassert> +#include <utility> +#include <chrono> +#include <fstream> +#include <memory> +#include <string> +#include <thread> +#include <type_traits> +#include <vector> +#include <iostream> +#include <sstream> +#include "TestBase.h" +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "unit/ProvenanceTestHelper.h" +#include "io/StreamFactory.h" +#include "c2/C2Agent.h" +#include "CivetServer.h" +#include <cstring> +#include "protocols/RESTSender.h" + +void waitToVerifyProcessor() { + std::this_thread::sleep_for(std::chrono::seconds(10)); +} + +static std::vector<std::string> responses; + +class ConfigHandler : public CivetHandler { + public: + bool handlePost(CivetServer *server, struct mg_connection *conn) { + if (responses.size() > 0) { + std::string top_str = responses.back(); + responses.pop_back(); + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + top_str.length()); + mg_printf(conn, "%s", top_str.c_str()); + } else { + mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"); + } + + return true; + } + + bool handleGet(CivetServer *server, struct mg_connection *conn) { + std::ifstream myfile(test_file_location_.c_str()); + + if (myfile.is_open()) { + std::stringstream buffer; + buffer << myfile.rdbuf(); + std::string str = buffer.str(); + myfile.close(); + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + str.length()); + mg_printf(conn, "%s", str.c_str()); + } else { + mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"); + } + + return true; + } + std::string test_file_location_; +}; + +int main(int argc, char **argv) { + mg_init_library(0); + LogTestController::getInstance().setInfo<minifi::FlowController>(); + LogTestController::getInstance().setDebug<minifi::utils::HTTPClient>(); + LogTestController::getInstance().setDebug<minifi::c2::RESTSender>(); + LogTestController::getInstance().setDebug<minifi::c2::C2Agent>(); + + const char *options[] = { "document_root", ".", "listening_ports", "9090", 0 }; + std::vector<std::string> cpp_options; + for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { + cpp_options.push_back(options[i]); + } + + CivetServer server(cpp_options); + ConfigHandler h_ex; + server.addHandler("/update", h_ex); + std::string key_dir, test_file_location; + if (argc > 1) { + h_ex.test_file_location_ = test_file_location = argv[1]; + key_dir = argv[2]; + } + std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {" + "\"operation\" : \"update\", " + "\"operationid\" : \"8675309\", " + "\"name\": \"configuration\"" + "}]}"; + + responses.push_back(heartbeat_response); + + std::ifstream myfile(test_file_location.c_str()); + + if (myfile.is_open()) { + std::stringstream buffer; + buffer << myfile.rdbuf(); + std::string str = buffer.str(); + myfile.close(); + std::string response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {" + "\"operation\" : \"update\", " + "\"operationid\" : \"8675309\", " + "\"name\": \"configuration\", \"content\": { \"location\": \"http://localhost:9090/update\"}}]}"; + responses.push_back(response); + } + + std::shared_ptr<minifi::Configure> configuration = std::make_shared< + minifi::Configure>(); + + configuration->set("c2.rest.url", + "http://localhost:9090/update"); + mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); + + std::shared_ptr<core::Repository> test_repo = + std::make_shared<TestRepository>(); + std::shared_ptr<core::Repository> test_flow_repo = std::make_shared< + TestFlowRepository>(); + + configuration->set(minifi::Configure::nifi_flow_configuration_file, + test_file_location); + + std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared + <minifi::io::StreamFactory>(configuration); + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr + <core::YamlConfiguration + >(new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, + configuration, + test_file_location)); + std::shared_ptr<TestRepository> repo = std::static_pointer_cast + <TestRepository>(test_repo); + + std::shared_ptr<minifi::FlowController> controller = + std::make_shared<minifi::FlowController + >(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME, true); + + core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, + configuration, + test_file_location); + + std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot( + test_file_location); + std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup + >(ptr.get()); + ptr.release(); + + controller->load(); + controller->start(); + waitToVerifyProcessor(); + + controller->waitUnload(60000); + std::string logs = LogTestController::getInstance().log_output.str(); + assert(logs.find("Starting to reload Flow Controller with flow control name MiNiFi Flow, version 0") != std::string::npos); + LogTestController::getInstance().reset(); + + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/HTTPHandlers.h ---------------------------------------------------------------------- diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h new file mode 100644 index 0000000..714090a --- /dev/null +++ b/extensions/http-curl/tests/HTTPHandlers.h @@ -0,0 +1,320 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "civetweb.h" +#include "CivetServer.h" +#include "concurrentqueue.h" +#include "CivetStream.h" +#include "io/CRCStream.h" +#ifndef LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_ +#define LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_ +static std::atomic<int> transaction_id; +static std::atomic<int> transaction_id_output; + +class FlowObj { + public: + FlowObj() + : total_size(0) { + + } + explicit FlowObj(const FlowObj &&other) + : total_size(std::move(other.total_size)), + attributes(std::move(other.attributes)), + data(std::move(other.data)) { + + } + uint64_t total_size; + std::map<std::string, std::string> attributes; + std::vector<uint8_t> data; + +}; + +class SiteToSiteLocationResponder : public CivetHandler { + public: + explicit SiteToSiteLocationResponder(bool isSecure) + : isSecure(isSecure) { + } + bool handleGet(CivetServer *server, struct mg_connection *conn) { + std::string site2site_rest_resp = "{" + "\"revision\": {" + "\"clientId\": \"483d53eb-53ec-4e93-b4d4-1fc3d23dae6f\"" + "}," + "\"controller\": {" + "\"id\": \"fe4a3a42-53b6-4af1-a80d-6fdfe60de97f\"," + "\"name\": \"NiFi Flow\"," + "\"siteToSiteSecure\": "; + site2site_rest_resp += (isSecure ? "true" : "false"); + site2site_rest_resp += "}}"; + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + site2site_rest_resp.length()); + mg_printf(conn, "%s", site2site_rest_resp.c_str()); + return true; + } + + protected: + bool isSecure; +}; + +class PeerResponder : public CivetHandler { + public: + + explicit PeerResponder(const std::string base_url) + : base_url(base_url) { + } + + bool handleGet(CivetServer *server, struct mg_connection *conn) { + std::string site2site_rest_resp = "{\"peers\" : [{ \"hostname\": \"localhost\", \"port\": 8082, \"secure\": false, \"flowFileCount\" : 0 }] }"; + std::stringstream headers; + headers << "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nConnection: close\r\n\r\n"; + mg_printf(conn, "%s", headers.str().c_str()); + mg_printf(conn, "%s", site2site_rest_resp.c_str()); + return true; + } + + protected: + std::string base_url; +}; + +class TransactionResponder : public CivetHandler { + public: + + explicit TransactionResponder(const std::string base_url, std::string port_id, bool input_port, bool wrong_uri, bool empty_transaction_uri) + : base_url(base_url), + wrong_uri(wrong_uri), + empty_transaction_uri(empty_transaction_uri), + input_port(input_port), + port_id(port_id), + flow_files_feed_(nullptr) { + + if (input_port) { + transaction_id_str = "fe4a3a42-53b6-4af1-a80d-6fdfe60de96"; + transaction_id_str += std::to_string(transaction_id.load()); + transaction_id++; + } else { + transaction_id_str = "fe4a3a42-53b6-4af1-a80d-6fdfe60de95"; + transaction_id_str += std::to_string(transaction_id_output.load()); + transaction_id_output++; + } + } + + bool handlePost(CivetServer *server, struct mg_connection *conn) { + std::string site2site_rest_resp = ""; + std::stringstream headers; + headers << "HTTP/1.1 201 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nx-location-uri-intent: "; + if (wrong_uri) + headers << "ohstuff\r\n"; + else + headers << "transaction-url\r\n"; + + std::string port_type; + + if (input_port) + port_type = "input-ports"; + else + port_type = "output-ports"; + if (!empty_transaction_uri) + headers << "Location: " << base_url << "/site-to-site/" << port_type << "/" << port_id << "/transactions/" << transaction_id_str << "\r\n"; + headers << "Connection: close\r\n\r\n"; + mg_printf(conn, "%s", headers.str().c_str()); + mg_printf(conn, "%s", site2site_rest_resp.c_str()); + return true; + } + + void setFeed(moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed) { + flow_files_feed_ = feed; + } + + std::string getTransactionId() { + return transaction_id_str; + } + protected: + std::string base_url; + std::string transaction_id_str; + bool wrong_uri; + bool empty_transaction_uri; + bool input_port; + std::string port_id; + moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_; +}; + +class FlowFileResponder : public CivetHandler { + public: + + explicit FlowFileResponder(bool input_port, bool wrong_uri, bool invalid_checksum) + : wrong_uri(wrong_uri), + input_port(input_port), + invalid_checksum(invalid_checksum), + flow_files_feed_(nullptr) { + } + + moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *getFlows() { + return &flow_files_; + } + + void setFeed(moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed) { + flow_files_feed_ = feed; + } + + bool handlePost(CivetServer *server, struct mg_connection *conn) { + std::string site2site_rest_resp = ""; + std::stringstream headers; + + if (!wrong_uri) { + minifi::io::CivetStream civet_stream(conn); + minifi::io::CRCStream<minifi::io::CivetStream> stream(&civet_stream); + uint32_t num_attributes; + uint64_t total_size = 0; + total_size += stream.read(num_attributes); + + auto flow = std::make_shared<FlowObj>(); + + for (int i = 0; i < num_attributes; i++) { + std::string name, value; + total_size += stream.readUTF(name, true); + total_size += stream.readUTF(value, true); + flow->attributes[name] = value; + } + uint64_t length; + total_size += stream.read(length); + + total_size += length; + flow->data.resize(length); + flow->total_size = total_size; + + assert(stream.readData(flow->data.data(), length) == length); + + assert(flow->attributes["path"] == "."); + assert(!flow->attributes["uuid"].empty()); + assert(!flow->attributes["filename"].empty()); + + if (!invalid_checksum) { + site2site_rest_resp = std::to_string(stream.getCRC()); + flow_files_.enqueue(flow); + } else { + site2site_rest_resp = "Imawrongchecksumshortandstout"; + } + + headers << "HTTP/1.1 202 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nConnection: close\r\n\r\n"; + } else { + headers << "HTTP/1.1 404\r\nConnection: close\r\n\r\n"; + } + + mg_printf(conn, "%s", headers.str().c_str()); + mg_printf(conn, "%s", site2site_rest_resp.c_str()); + return true; + } + + bool handleGet(CivetServer *server, struct mg_connection *conn) { + + if (flow_files_feed_->size_approx() > 0) { + std::shared_ptr<FlowObj> flow; + uint8_t buf[1]; + std::vector<std::shared_ptr<FlowObj>> flows; + uint64_t total = 0; + + while (flow_files_feed_->try_dequeue(flow)) { + flows.push_back(flow); + total += flow->total_size; + } + mg_printf(conn, "HTTP/1.1 200 OK\r\n" + "Content-Length: %llu\r\n" + "Content-Type: application/octet-stream\r\n" + "Connection: close\r\n\r\n", + total); + minifi::io::BaseStream serializer; + minifi::io::CRCStream<minifi::io::BaseStream> stream(&serializer); + for (auto flow : flows) { + uint32_t num_attributes = flow->attributes.size(); + stream.write(num_attributes); + for (auto entry : flow->attributes) { + stream.writeUTF(entry.first); + stream.writeUTF(entry.second); + } + uint64_t length = flow->data.size(); + stream.write(length); + stream.writeData(flow->data.data(), length); + } + auto ret = mg_write(conn, serializer.getBuffer(), total); + } else { + std::cout << "Nothing to transfer feed" << std::endl; + mg_printf(conn, "HTTP/1.1 200 OK\r\nConnection: " + "close\r\nContent-Length: 0\r\n"); + mg_printf(conn, "Content-Type: text/plain\r\n\r\n"); + + } + return true; + } + + void setFlowUrl(std::string flowUrl) { + base_url = flowUrl; + } + + protected: + // base url + std::string base_url; + // set the wrong url + bool wrong_uri; + // we are running an input port + bool input_port; + // invalid checksum is returned. + bool invalid_checksum; + moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> flow_files_; + moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_; +}; + +class DeleteTransactionResponder : public CivetHandler { + public: + + explicit DeleteTransactionResponder(const std::string base_url, std::string response_code, int expected_resp_code) + : flow_files_feed_(nullptr), + base_url(base_url), + response_code(response_code) { + expected_resp_code_str = std::to_string(expected_resp_code); + } + + explicit DeleteTransactionResponder(const std::string base_url, std::string response_code, moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed) + : flow_files_feed_(feed), + base_url(base_url), + response_code(response_code) { + } + + bool handleDelete(CivetServer *server, struct mg_connection *conn) { + + std::string site2site_rest_resp = ""; + std::stringstream headers; + std::string resp; + CivetServer::getParam(conn, "responseCode", resp); + headers << "HTTP/1.1 " << response_code << "\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\n"; + headers << "Connection: close\r\n\r\n"; + mg_printf(conn, "%s", headers.str().c_str()); + mg_printf(conn, "%s", site2site_rest_resp.c_str()); + return true; + } + + void setFeed(moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed) { + flow_files_feed_ = feed; + } + + protected: + moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_; + std::string base_url; + std::string expected_resp_code_str; + std::string response_code; +}; + +#endif /* LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_ */
