http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/civet_curl_tests/include/sitetositehttp/HTTPHandlers.h ---------------------------------------------------------------------- diff --git a/extensions/civet_curl_tests/include/sitetositehttp/HTTPHandlers.h b/extensions/civet_curl_tests/include/sitetositehttp/HTTPHandlers.h deleted file mode 100644 index d188df3..0000000 --- a/extensions/civet_curl_tests/include/sitetositehttp/HTTPHandlers.h +++ /dev/null @@ -1,320 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "civetweb.h" -#include "CivetServer.h" -#include "concurrentqueue.h" -#include "io/CivetStream.h" -#include "io/CRCStream.h" -#ifndef LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_ -#define LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_ -static std::atomic<int> transaction_id; -static std::atomic<int> transaction_id_output; - -class FlowObj { - public: - FlowObj() - : total_size(0) { - - } - explicit FlowObj(const FlowObj &&other) - : total_size(std::move(other.total_size)), - attributes(std::move(other.attributes)), - data(std::move(other.data)) { - - } - uint64_t total_size; - std::map<std::string, std::string> attributes; - std::vector<uint8_t> data; - -}; - -class SiteToSiteLocationResponder : public CivetHandler { - public: - explicit SiteToSiteLocationResponder(bool isSecure) - : isSecure(isSecure) { - } - bool handleGet(CivetServer *server, struct mg_connection *conn) { - std::string site2site_rest_resp = "{" - "\"revision\": {" - "\"clientId\": \"483d53eb-53ec-4e93-b4d4-1fc3d23dae6f\"" - "}," - "\"controller\": {" - "\"id\": \"fe4a3a42-53b6-4af1-a80d-6fdfe60de97f\"," - "\"name\": \"NiFi Flow\"," - "\"siteToSiteSecure\": "; - site2site_rest_resp += (isSecure ? "true" : "false"); - site2site_rest_resp += "}}"; - mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " - "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", - site2site_rest_resp.length()); - mg_printf(conn, "%s", site2site_rest_resp.c_str()); - return true; - } - - protected: - bool isSecure; -}; - -class PeerResponder : public CivetHandler { - public: - - explicit PeerResponder(const std::string base_url) - : base_url(base_url) { - } - - bool handleGet(CivetServer *server, struct mg_connection *conn) { - std::string site2site_rest_resp = "{\"peers\" : [{ \"hostname\": \"localhost\", \"port\": 8082, \"secure\": false, \"flowFileCount\" : 0 }] }"; - std::stringstream headers; - headers << "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nConnection: close\r\n\r\n"; - mg_printf(conn, "%s", headers.str().c_str()); - mg_printf(conn, "%s", site2site_rest_resp.c_str()); - return true; - } - - protected: - std::string base_url; -}; - -class TransactionResponder : public CivetHandler { - public: - - explicit TransactionResponder(const std::string base_url, std::string port_id, bool input_port, bool wrong_uri, bool empty_transaction_uri) - : base_url(base_url), - wrong_uri(wrong_uri), - empty_transaction_uri(empty_transaction_uri), - input_port(input_port), - port_id(port_id), - flow_files_feed_(nullptr) { - - if (input_port) { - transaction_id_str = "fe4a3a42-53b6-4af1-a80d-6fdfe60de96"; - transaction_id_str += std::to_string(transaction_id.load()); - transaction_id++; - } else { - transaction_id_str = "fe4a3a42-53b6-4af1-a80d-6fdfe60de95"; - transaction_id_str += std::to_string(transaction_id_output.load()); - transaction_id_output++; - } - } - - bool handlePost(CivetServer *server, struct mg_connection *conn) { - std::string site2site_rest_resp = ""; - std::stringstream headers; - headers << "HTTP/1.1 201 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nx-location-uri-intent: "; - if (wrong_uri) - headers << "ohstuff\r\n"; - else - headers << "transaction-url\r\n"; - - std::string port_type; - - if (input_port) - port_type = "input-ports"; - else - port_type = "output-ports"; - if (!empty_transaction_uri) - headers << "Location: " << base_url << "/site-to-site/" << port_type << "/" << port_id << "/transactions/" << transaction_id_str << "\r\n"; - headers << "Connection: close\r\n\r\n"; - mg_printf(conn, "%s", headers.str().c_str()); - mg_printf(conn, "%s", site2site_rest_resp.c_str()); - return true; - } - - void setFeed(moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed) { - flow_files_feed_ = feed; - } - - std::string getTransactionId() { - return transaction_id_str; - } - protected: - std::string base_url; - std::string transaction_id_str; - bool wrong_uri; - bool empty_transaction_uri; - bool input_port; - std::string port_id; - moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_; -}; - -class FlowFileResponder : public CivetHandler { - public: - - explicit FlowFileResponder(bool input_port, bool wrong_uri, bool invalid_checksum) - : wrong_uri(wrong_uri), - input_port(input_port), - invalid_checksum(invalid_checksum), - flow_files_feed_(nullptr) { - } - - moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *getFlows() { - return &flow_files_; - } - - void setFeed(moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed) { - flow_files_feed_ = feed; - } - - bool handlePost(CivetServer *server, struct mg_connection *conn) { - std::string site2site_rest_resp = ""; - std::stringstream headers; - - if (!wrong_uri) { - minifi::io::CivetStream civet_stream(conn); - minifi::io::CRCStream<minifi::io::CivetStream> stream(&civet_stream); - uint32_t num_attributes; - uint64_t total_size = 0; - total_size += stream.read(num_attributes); - - auto flow = std::make_shared<FlowObj>(); - - for (int i = 0; i < num_attributes; i++) { - std::string name, value; - total_size += stream.readUTF(name, true); - total_size += stream.readUTF(value, true); - flow->attributes[name] = value; - } - uint64_t length; - total_size += stream.read(length); - - total_size += length; - flow->data.resize(length); - flow->total_size = total_size; - - assert(stream.readData(flow->data.data(), length) == length); - - assert(flow->attributes["path"] == "."); - assert(!flow->attributes["uuid"].empty()); - assert(!flow->attributes["filename"].empty()); - - if (!invalid_checksum) { - site2site_rest_resp = std::to_string(stream.getCRC()); - flow_files_.enqueue(flow); - } else { - site2site_rest_resp = "Imawrongchecksumshortandstout"; - } - - headers << "HTTP/1.1 202 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nConnection: close\r\n\r\n"; - } else { - headers << "HTTP/1.1 404\r\nConnection: close\r\n\r\n"; - } - - mg_printf(conn, "%s", headers.str().c_str()); - mg_printf(conn, "%s", site2site_rest_resp.c_str()); - return true; - } - - bool handleGet(CivetServer *server, struct mg_connection *conn) { - - if (flow_files_feed_->size_approx() > 0) { - std::shared_ptr<FlowObj> flow; - uint8_t buf[1]; - std::vector<std::shared_ptr<FlowObj>> flows; - uint64_t total = 0; - - while (flow_files_feed_->try_dequeue(flow)) { - flows.push_back(flow); - total += flow->total_size; - } - mg_printf(conn, "HTTP/1.1 200 OK\r\n" - "Content-Length: %llu\r\n" - "Content-Type: application/octet-stream\r\n" - "Connection: close\r\n\r\n", - total); - minifi::io::BaseStream serializer; - minifi::io::CRCStream<minifi::io::BaseStream> stream(&serializer); - for (auto flow : flows) { - uint32_t num_attributes = flow->attributes.size(); - stream.write(num_attributes); - for (auto entry : flow->attributes) { - stream.writeUTF(entry.first); - stream.writeUTF(entry.second); - } - uint64_t length = flow->data.size(); - stream.write(length); - stream.writeData(flow->data.data(), length); - } - auto ret = mg_write(conn, serializer.getBuffer(), total); - } else { - std::cout << "Nothing to transfer feed" << std::endl; - mg_printf(conn, "HTTP/1.1 200 OK\r\nConnection: " - "close\r\nContent-Length: 0\r\n"); - mg_printf(conn, "Content-Type: text/plain\r\n\r\n"); - - } - return true; - } - - void setFlowUrl(std::string flowUrl) { - base_url = flowUrl; - } - - protected: - // base url - std::string base_url; - // set the wrong url - bool wrong_uri; - // we are running an input port - bool input_port; - // invalid checksum is returned. - bool invalid_checksum; - moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> flow_files_; - moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_; -}; - -class DeleteTransactionResponder : public CivetHandler { - public: - - explicit DeleteTransactionResponder(const std::string base_url, std::string response_code, int expected_resp_code) - : flow_files_feed_(nullptr), - base_url(base_url), - response_code(response_code) { - expected_resp_code_str = std::to_string(expected_resp_code); - } - - explicit DeleteTransactionResponder(const std::string base_url, std::string response_code, moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed) - : flow_files_feed_(feed), - base_url(base_url), - response_code(response_code) { - } - - bool handleDelete(CivetServer *server, struct mg_connection *conn) { - - std::string site2site_rest_resp = ""; - std::stringstream headers; - std::string resp; - CivetServer::getParam(conn, "responseCode", resp); - headers << "HTTP/1.1 " << response_code << "\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\n"; - headers << "Connection: close\r\n\r\n"; - mg_printf(conn, "%s", headers.str().c_str()); - mg_printf(conn, "%s", site2site_rest_resp.c_str()); - return true; - } - - void setFeed(moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed) { - flow_files_feed_ = feed; - } - - protected: - moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_; - std::string base_url; - std::string expected_resp_code_str; - std::string response_code; -}; - -#endif /* LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/civet_curl_tests/unit/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/extensions/civet_curl_tests/unit/CMakeLists.txt b/extensions/civet_curl_tests/unit/CMakeLists.txt deleted file mode 100644 index b645da1..0000000 --- a/extensions/civet_curl_tests/unit/CMakeLists.txt +++ /dev/null @@ -1,76 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -file(GLOB CURL_UNIT_TESTS "unit/*.cpp") -file(GLOB CURL_INTEGRATION_TESTS "*.cpp") - -SET(CURL_INT_TEST_COUNT 0) - -FOREACH(testfile ${CURL_UNIT_TESTS}) - get_filename_component(testfilename "${testfile}" NAME_WE) - add_executable("${testfilename}" "${testfile}") - target_include_directories(${testfilename} BEFORE PRIVATE ${CURL_INCLUDE_DIRS}) - target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/") - target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/client/") - target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/processors/") - target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/protocols/") - target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/sitetosite/") - target_link_libraries(${testfilename} ${CURL_LIBRARIES} ) - createTests("${testfilename}") - target_link_libraries(${testfilename} ${CATCH_MAIN_LIB}) - if (APPLE) - target_link_libraries ("${testfilename}" -Wl,-all_load minifi-http-curl ) - else () - target_link_libraries ("${testfilename}" -Wl,--whole-archive minifi-http-curl -Wl,--no-whole-archive) - endif() - MATH(EXPR CURL_INT_TEST_COUNT "${CURL_INT_TEST_COUNT}+1") -ENDFOREACH() - -FOREACH(testfile ${CURL_INTEGRATION_TESTS}) - get_filename_component(testfilename "${testfile}" NAME_WE) - add_executable("${testfilename}" "${testfile}") - target_include_directories(${testfilename} BEFORE PRIVATE ${CURL_INCLUDE_DIRS}) - target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/") - target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/client/") - target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/processors/") - target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/protocols/") - target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/sitetosite/") - target_link_libraries(${testfilename} ${CURL_LIBRARIES} ) - createTests("${testfilename}") - if (APPLE) - target_link_libraries ("${testfilename}" -Wl,-all_load minifi-http-curl ) - else () - target_link_libraries ("${testfilename}" -Wl,--whole-archive minifi-http-curl -Wl,--no-whole-archive) - endif() - MATH(EXPR CURL_INT_TEST_COUNT "${CURL_INT_TEST_COUNT}+1") -ENDFOREACH() - -message("-- Finished building ${CURL_INT_TEST_COUNT} libcURL integration test file(s)...") - -add_test(NAME HttpGetIntegrationTest COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") -add_test(NAME C2UpdateTest COMMAND C2UpdateTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") -add_test(NAME C2NullConfiguration COMMAND C2NullConfiguration "${TEST_RESOURCES}/TestNull.yml" "${TEST_RESOURCES}/") -add_test(NAME HttpGetIntegrationTestSecure COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGetSecure.yml" "${TEST_RESOURCES}/") -add_test(NAME HttpPostIntegrationTest COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPost.yml" "${TEST_RESOURCES}/") -add_test(NAME HttpPostIntegrationTestChunked COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPostChunkedEncoding.yml" "${TEST_RESOURCES}/") -add_test(NAME C2VerifyServeResults COMMAND C2VerifyServeResults "${TEST_RESOURCES}/C2VerifyServeResults.yml" "${TEST_RESOURCES}/") -add_test(NAME C2VerifyHeartbeatAndStop COMMAND C2VerifyHeartbeatAndStop "${TEST_RESOURCES}/C2VerifyHeartbeatAndStop.yml" "${TEST_RESOURCES}/") -add_test(NAME SiteToSiteRestTest COMMAND SiteToSiteRestTest "${TEST_RESOURCES}/TestSite2SiteRest.yml" "${TEST_RESOURCES}/" "http://localhost:8077/nifi-api/site-to-site") -add_test(NAME ControllerServiceIntegrationTests COMMAND ControllerServiceIntegrationTests "${TEST_RESOURCES}/TestControllerServices.yml" "${TEST_RESOURCES}/") -add_test(NAME ThreadPoolAdjust COMMAND ThreadPoolAdjust "${TEST_RESOURCES}/ThreadPoolAdjust.yml" "${TEST_RESOURCES}/") http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/civet_curl_tests/unit/InvokeHTTPTests.cpp ---------------------------------------------------------------------- diff --git a/extensions/civet_curl_tests/unit/InvokeHTTPTests.cpp b/extensions/civet_curl_tests/unit/InvokeHTTPTests.cpp deleted file mode 100644 index 81d2714..0000000 --- a/extensions/civet_curl_tests/unit/InvokeHTTPTests.cpp +++ /dev/null @@ -1,315 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <uuid/uuid.h> -#include <fstream> -#include <map> -#include <memory> -#include <utility> -#include <string> -#include <set> -#include "FlowController.h" -#include "io/BaseStream.h" -#include "TestBase.h" -#include "processors/GetFile.h" -#include "core/Core.h" -#include "HTTPClient.h" -#include "InvokeHTTP.h" -#include "processors/ListenHTTP.h" -#include "core/FlowFile.h" -#include "unit/ProvenanceTestHelper.h" -#include "core/Processor.h" -#include "core/ProcessContext.h" -#include "core/ProcessSession.h" -#include "core/ProcessorNode.h" -#include "processors/InvokeHTTP.h" -#include "processors/ListenHTTP.h" -#include "processors/LogAttribute.h" - -TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") { - TestController testController; - std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::InvokeHTTP>(); - - std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); - - std::shared_ptr<core::Processor> getfileprocessor = std::make_shared<org::apache::nifi::minifi::processors::GetFile>("getfileCreate2"); - - std::shared_ptr<core::Processor> logAttribute = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); - - char format[] = "/tmp/gt.XXXXXX"; - char *dir = testController.createTempDirectory(format); - - std::shared_ptr<core::Processor> listenhttp = std::make_shared<org::apache::nifi::minifi::processors::ListenHTTP>("listenhttp"); - - std::shared_ptr<core::Processor> invokehttp = std::make_shared<org::apache::nifi::minifi::processors::InvokeHTTP>("invokehttp"); - uuid_t processoruuid; - REQUIRE(true == listenhttp->getUUID(processoruuid)); - - uuid_t invokehttp_uuid; - REQUIRE(true == invokehttp->getUUID(invokehttp_uuid)); - - std::shared_ptr<minifi::Connection> gcConnection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection"); - gcConnection->setRelationship(core::Relationship("success", "description")); - - std::shared_ptr<minifi::Connection> laConnection = std::make_shared<minifi::Connection>(repo, content_repo, "logattribute"); - laConnection->setRelationship(core::Relationship("success", "description")); - - std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection"); - connection->setRelationship(core::Relationship("success", "description")); - - std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, content_repo, "listenhttp"); - - connection2->setRelationship(core::Relationship("No Retry", "description")); - - // link the connections so that we can test results at the end for this - connection->setSource(listenhttp); - - connection2->setSourceUUID(invokehttp_uuid); - connection->setSourceUUID(processoruuid); - connection->setDestinationUUID(invokehttp_uuid); - - listenhttp->addConnection(connection); - invokehttp->addConnection(connection); - invokehttp->addConnection(connection2); - - std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(listenhttp); - std::shared_ptr<core::ProcessorNode> node2 = std::make_shared<core::ProcessorNode>(invokehttp); - std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; - std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo); - std::shared_ptr<core::ProcessContext> context2 = std::make_shared<core::ProcessContext>(node2, controller_services_provider, repo, repo, content_repo); - context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, "8686"); - context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath, "/testytesttest"); - - context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::Method, "POST"); - context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::URL, "http://localhost:8686/testytesttest"); - auto session = std::make_shared<core::ProcessSession>(context); - auto session2 = std::make_shared<core::ProcessSession>(context2); - - REQUIRE(listenhttp->getName() == "listenhttp"); - - std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context); - - std::shared_ptr<core::FlowFile> record; - listenhttp->setScheduledState(core::ScheduledState::RUNNING); - listenhttp->onSchedule(context, factory); - listenhttp->onTrigger(context, session); - - invokehttp->incrementActiveTasks(); - invokehttp->setScheduledState(core::ScheduledState::RUNNING); - std::shared_ptr<core::ProcessSessionFactory> factory2 = std::make_shared<core::ProcessSessionFactory>(context2); - invokehttp->onSchedule(context2, factory2); - invokehttp->onTrigger(context2, session2); - - provenance::ProvenanceReporter *reporter = session->getProvenanceReporter(); - std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents(); - record = session->get(); - REQUIRE(record == nullptr); - REQUIRE(records.size() == 0); - - listenhttp->incrementActiveTasks(); - listenhttp->setScheduledState(core::ScheduledState::RUNNING); - listenhttp->onTrigger(context, session); - - reporter = session->getProvenanceReporter(); - - records = reporter->getEvents(); - session->commit(); - - invokehttp->incrementActiveTasks(); - invokehttp->setScheduledState(core::ScheduledState::RUNNING); - invokehttp->onTrigger(context2, session2); - - session2->commit(); - records = reporter->getEvents(); - - for (provenance::ProvenanceEventRecord *provEventRecord : records) { - REQUIRE(provEventRecord->getComponentType() == listenhttp->getName()); - } - std::shared_ptr<core::FlowFile> ffr = session2->get(); - REQUIRE(true == LogTestController::getInstance().contains("exiting because method is POST")); - LogTestController::getInstance().reset(); -} - -class CallBack : public minifi::OutputStreamCallback { - public: - CallBack() { - } - virtual ~CallBack() { - } - virtual int64_t process(std::shared_ptr<minifi::io::BaseStream> stream) { - // leaving the typo for posterity sake - std::string st = "we're gnna write some test stuff"; - return stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(st.c_str())), st.length()); - } -}; - -TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") { - TestController testController; - LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::InvokeHTTP>(); - - std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); - - std::shared_ptr<core::Processor> getfileprocessor = std::make_shared<org::apache::nifi::minifi::processors::GetFile>("getfileCreate2"); - - std::shared_ptr<core::Processor> logAttribute = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); - - char format[] = "/tmp/gt.XXXXXX"; - char *dir = testController.createTempDirectory(format); - - std::shared_ptr<core::Processor> listenhttp = std::make_shared<org::apache::nifi::minifi::processors::ListenHTTP>("listenhttp"); - - std::shared_ptr<core::Processor> invokehttp = std::make_shared<org::apache::nifi::minifi::processors::InvokeHTTP>("invokehttp"); - uuid_t processoruuid; - REQUIRE(true == listenhttp->getUUID(processoruuid)); - - uuid_t invokehttp_uuid; - REQUIRE(true == invokehttp->getUUID(invokehttp_uuid)); - - std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - - std::shared_ptr<minifi::Connection> gcConnection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection"); - gcConnection->setRelationship(core::Relationship("success", "description")); - - std::shared_ptr<minifi::Connection> laConnection = std::make_shared<minifi::Connection>(repo, content_repo, "logattribute"); - laConnection->setRelationship(core::Relationship("success", "description")); - - std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection"); - connection->setRelationship(core::Relationship("success", "description")); - - std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, content_repo, "listenhttp"); - - connection2->setRelationship(core::Relationship("No Retry", "description")); - - // link the connections so that we can test results at the end for this - connection->setSource(listenhttp); - - connection->setSourceUUID(invokehttp_uuid); - connection->setDestinationUUID(processoruuid); - - connection2->setSourceUUID(processoruuid); - connection2->setSourceUUID(processoruuid); - - listenhttp->addConnection(connection); - invokehttp->addConnection(connection); - invokehttp->addConnection(connection2); - - - std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(listenhttp); - std::shared_ptr<core::ProcessorNode> node2 = std::make_shared<core::ProcessorNode>(invokehttp); - std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; - std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo); - std::shared_ptr<core::ProcessContext> context2 = std::make_shared<core::ProcessContext>(node2, controller_services_provider, repo, repo, content_repo); - context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, "8680"); - context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath, "/testytesttest"); - - context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::Method, "POST"); - context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::URL, "http://localhost:8680/testytesttest"); - auto session = std::make_shared<core::ProcessSession>(context); - auto session2 = std::make_shared<core::ProcessSession>(context2); - - REQUIRE(listenhttp->getName() == "listenhttp"); - - std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context); - - std::shared_ptr<core::FlowFile> record; - - CallBack callback; - - std::map<std::string, std::string> attributes; - attributes["testy"] = "test"; - std::shared_ptr<minifi::FlowFileRecord> flow = std::make_shared<minifi::FlowFileRecord>(repo, content_repo, attributes); - session2->write(flow, &callback); - - invokehttp->incrementActiveTasks(); - invokehttp->setScheduledState(core::ScheduledState::RUNNING); - std::shared_ptr<core::ProcessSessionFactory> factory2 = std::make_shared<core::ProcessSessionFactory>(context2); - invokehttp->onSchedule(context2, factory2); - invokehttp->onTrigger(context2, session2); - - listenhttp->incrementActiveTasks(); - listenhttp->setScheduledState(core::ScheduledState::RUNNING); - listenhttp->onSchedule(context, factory); - listenhttp->onTrigger(context, session); - - provenance::ProvenanceReporter *reporter = session->getProvenanceReporter(); - std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents(); - record = session->get(); - REQUIRE(record == nullptr); - REQUIRE(records.size() == 0); - - listenhttp->incrementActiveTasks(); - listenhttp->setScheduledState(core::ScheduledState::RUNNING); - listenhttp->onTrigger(context, session); - - reporter = session->getProvenanceReporter(); - - records = reporter->getEvents(); - session->commit(); - - invokehttp->incrementActiveTasks(); - invokehttp->setScheduledState(core::ScheduledState::RUNNING); - invokehttp->onTrigger(context2, session2); - - session2->commit(); - records = reporter->getEvents(); - - for (provenance::ProvenanceEventRecord *provEventRecord : records) { - REQUIRE(provEventRecord->getComponentType() == listenhttp->getName()); - } - std::shared_ptr<core::FlowFile> ffr = session2->get(); - REQUIRE(true == LogTestController::getInstance().contains("exiting because method is POST")); - LogTestController::getInstance().reset(); -} - -TEST_CASE("HTTPTestsPostNoResourceClaim", "[httptest1]") { - TestController testController; - LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::InvokeHTTP>(); - LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::ListenHTTP>(); - LogTestController::getInstance().setInfo<core::Processor>(); - - std::shared_ptr<TestPlan> plan = testController.createPlan(); - std::shared_ptr<core::Processor> processor = plan->addProcessor("ListenHTTP", "listenhttp", core::Relationship("No Retry", "description"), false); - std::shared_ptr<core::Processor> invokehttp = plan->addProcessor("InvokeHTTP", "invokehttp", core::Relationship("success", "description"), true); - - REQUIRE(true == plan->setProperty(processor, org::apache::nifi::minifi::processors::ListenHTTP::Port.getName(), "8685")); - REQUIRE(true == plan->setProperty(processor, org::apache::nifi::minifi::processors::ListenHTTP::BasePath.getName(), "/testytesttest")); - - REQUIRE(true == plan->setProperty(invokehttp, org::apache::nifi::minifi::processors::InvokeHTTP::Method.getName(), "POST")); - REQUIRE(true == plan->setProperty(invokehttp, org::apache::nifi::minifi::processors::InvokeHTTP::URL.getName(), "http://localhost:8685/testytesttest")); - plan->reset(); - testController.runSession(plan, true); - - std::set<provenance::ProvenanceEventRecord*> records = plan->getProvenanceRecords(); - std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile(); - REQUIRE(record == nullptr); - REQUIRE(records.size() == 0); - - plan->reset(); - testController.runSession(plan, true); - - records = plan->getProvenanceRecords(); - record = plan->getCurrentFlowFile(); - - for (provenance::ProvenanceEventRecord *provEventRecord : records) { - REQUIRE(provEventRecord->getComponentType() == processor->getName()); - } - std::shared_ptr<core::FlowFile> ffr = plan->getCurrentFlowFile(); - REQUIRE(true == LogTestController::getInstance().contains("exiting because method is POST")); - LogTestController::getInstance().reset(); -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/civetweb/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/extensions/civetweb/CMakeLists.txt b/extensions/civetweb/CMakeLists.txt index d31baad..470a231 100644 --- a/extensions/civetweb/CMakeLists.txt +++ b/extensions/civetweb/CMakeLists.txt @@ -35,7 +35,7 @@ add_subdirectory(${CIVET_THIRDPARTY_ROOT} ${CIVET_BINARY_ROOT} EXCLUDE_FROM_ALL) -file(GLOB SOURCES "*.cpp") +file(GLOB SOURCES "processors/*.cpp") add_library(minifi-civet-extensions STATIC ${SOURCES}) set_property(TARGET minifi-civet-extensions PROPERTY POSITION_INDEPENDENT_CODE ON) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/civetweb/CivetLoader.cpp ---------------------------------------------------------------------- diff --git a/extensions/civetweb/CivetLoader.cpp b/extensions/civetweb/CivetLoader.cpp deleted file mode 100644 index c593bb7..0000000 --- a/extensions/civetweb/CivetLoader.cpp +++ /dev/null @@ -1,29 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "CivetLoader.h" -#include "core/FlowConfiguration.h" - -bool CivetFactory::added = core::FlowConfiguration::add_static_func("createCivetFactory"); - -extern "C" { - -void *createCivetFactory(void) { - return new CivetFactory(); -} - -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/civetweb/ListenHTTP.cpp ---------------------------------------------------------------------- diff --git a/extensions/civetweb/ListenHTTP.cpp b/extensions/civetweb/ListenHTTP.cpp deleted file mode 100644 index 62f8194..0000000 --- a/extensions/civetweb/ListenHTTP.cpp +++ /dev/null @@ -1,333 +0,0 @@ -/** - * @file ListenHTTP.cpp - - * ListenHTTP class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "processors/ListenHTTP.h" -#include <uuid/uuid.h> -#include <CivetServer.h> -#include <stdio.h> -#include <sstream> -#include <utility> -#include <memory> -#include <string> -#include <iostream> -#include <fstream> -#include <set> -#include <vector> -#include "utils/TimeUtil.h" -#include "core/ProcessContext.h" -#include "core/ProcessSession.h" -#include "core/ProcessSessionFactory.h" -#include "core/logging/LoggerConfiguration.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace processors { - -core::Property ListenHTTP::BasePath("Base Path", "Base path for incoming connections", "contentListener"); -core::Property ListenHTTP::Port("Listening Port", "The Port to listen on for incoming connections", ""); -core::Property ListenHTTP::AuthorizedDNPattern("Authorized DN Pattern", "A Regular Expression to apply against the Distinguished Name of incoming" - " connections. If the Pattern does not match the DN, the connection will be refused.", - ".*"); -core::Property ListenHTTP::SSLCertificate("SSL Certificate", "File containing PEM-formatted file including TLS/SSL certificate and key", ""); -core::Property ListenHTTP::SSLCertificateAuthority("SSL Certificate Authority", "File containing trusted PEM-formatted certificates", ""); -core::Property ListenHTTP::SSLVerifyPeer("SSL Verify Peer", "Whether or not to verify the client's certificate (yes/no)", "no"); -core::Property ListenHTTP::SSLMinimumVersion("SSL Minimum Version", "Minimum TLS/SSL version allowed (SSL2, SSL3, TLS1.0, TLS1.1, TLS1.2)", "SSL2"); -core::Property ListenHTTP::HeadersAsAttributesRegex("HTTP Headers to receive as Attributes (Regex)", "Specifies the Regular Expression that determines the names of HTTP Headers that" - " should be passed along as FlowFile attributes", - ""); - -core::Relationship ListenHTTP::Success("success", "All files are routed to success"); - -void ListenHTTP::initialize() { - logger_->log_info("Initializing ListenHTTP"); - - // Set the supported properties - std::set<core::Property> properties; - properties.insert(BasePath); - properties.insert(Port); - properties.insert(AuthorizedDNPattern); - properties.insert(SSLCertificate); - properties.insert(SSLCertificateAuthority); - properties.insert(SSLVerifyPeer); - properties.insert(SSLMinimumVersion); - properties.insert(HeadersAsAttributesRegex); - setSupportedProperties(properties); - // Set the supported relationships - std::set<core::Relationship> relationships; - relationships.insert(Success); - setSupportedRelationships(relationships); -} - -void ListenHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { - std::string basePath; - - if (!context->getProperty(BasePath.getName(), basePath)) { - logger_->log_info("%s attribute is missing, so default value of %s will be used", BasePath.getName().c_str(), BasePath.getValue().c_str()); - basePath = BasePath.getValue(); - } - - basePath.insert(0, "/"); - - std::string listeningPort; - - if (!context->getProperty(Port.getName(), listeningPort)) { - logger_->log_error("%s attribute is missing or invalid", Port.getName().c_str()); - return; - } - - std::string authDNPattern; - - if (context->getProperty(AuthorizedDNPattern.getName(), authDNPattern) && !authDNPattern.empty()) { - logger_->log_info("ListenHTTP using %s: %s", AuthorizedDNPattern.getName().c_str(), authDNPattern.c_str()); - } - - std::string sslCertFile; - - if (context->getProperty(SSLCertificate.getName(), sslCertFile) && !sslCertFile.empty()) { - logger_->log_info("ListenHTTP using %s: %s", SSLCertificate.getName().c_str(), sslCertFile.c_str()); - } - - // Read further TLS/SSL options only if TLS/SSL usage is implied by virtue of certificate value being set - std::string sslCertAuthorityFile; - std::string sslVerifyPeer; - std::string sslMinVer; - - if (!sslCertFile.empty()) { - if (context->getProperty(SSLCertificateAuthority.getName(), sslCertAuthorityFile) && !sslCertAuthorityFile.empty()) { - logger_->log_info("ListenHTTP using %s: %s", SSLCertificateAuthority.getName().c_str(), sslCertAuthorityFile.c_str()); - } - - if (context->getProperty(SSLVerifyPeer.getName(), sslVerifyPeer)) { - if (sslVerifyPeer.empty() || sslVerifyPeer.compare("no") == 0) { - logger_->log_info("ListenHTTP will not verify peers"); - } else { - logger_->log_info("ListenHTTP will verify peers"); - } - } else { - logger_->log_info("ListenHTTP will not verify peers"); - } - - if (context->getProperty(SSLMinimumVersion.getName(), sslMinVer)) { - logger_->log_info("ListenHTTP using %s: %s", SSLMinimumVersion.getName().c_str(), sslMinVer.c_str()); - } - } - - std::string headersAsAttributesPattern; - - if (context->getProperty(HeadersAsAttributesRegex.getName(), headersAsAttributesPattern) && !headersAsAttributesPattern.empty()) { - logger_->log_info("ListenHTTP using %s: %s", HeadersAsAttributesRegex.getName().c_str(), headersAsAttributesPattern.c_str()); - } - - auto numThreads = getMaxConcurrentTasks(); - - logger_->log_info("ListenHTTP starting HTTP server on port %s and path %s with %d threads", listeningPort.c_str(), basePath.c_str(), numThreads); - - // Initialize web server - std::vector<std::string> options; - options.push_back("enable_keep_alive"); - options.push_back("yes"); - options.push_back("keep_alive_timeout_ms"); - options.push_back("15000"); - options.push_back("num_threads"); - options.push_back(std::to_string(numThreads)); - - if (sslCertFile.empty()) { - options.push_back("listening_ports"); - options.push_back(listeningPort); - } else { - listeningPort += "s"; - options.push_back("listening_ports"); - options.push_back(listeningPort); - - options.push_back("ssl_certificate"); - options.push_back(sslCertFile); - - if (!sslCertAuthorityFile.empty()) { - options.push_back("ssl_ca_file"); - options.push_back(sslCertAuthorityFile); - } - - if (sslVerifyPeer.empty() || sslVerifyPeer.compare("no") == 0) { - options.push_back("ssl_verify_peer"); - options.push_back("no"); - } else { - options.push_back("ssl_verify_peer"); - options.push_back("yes"); - } - - if (sslMinVer.compare("SSL2") == 0) { - options.push_back("ssl_protocol_version"); - options.push_back(std::to_string(0)); - } else if (sslMinVer.compare("SSL3") == 0) { - options.push_back("ssl_protocol_version"); - options.push_back(std::to_string(1)); - } else if (sslMinVer.compare("TLS1.0") == 0) { - options.push_back("ssl_protocol_version"); - options.push_back(std::to_string(2)); - } else if (sslMinVer.compare("TLS1.1") == 0) { - options.push_back("ssl_protocol_version"); - options.push_back(std::to_string(3)); - } else { - options.push_back("ssl_protocol_version"); - options.push_back(std::to_string(4)); - } - } - - _server.reset(new CivetServer(options)); - _handler.reset(new Handler(context, sessionFactory, std::move(authDNPattern), std::move(headersAsAttributesPattern))); - _server->addHandler(basePath, _handler.get()); -} - -ListenHTTP::~ListenHTTP() { -} - -void ListenHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { - std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get()); - - // Do nothing if there are no incoming files - if (!flowFile) { - return; - } -} - -ListenHTTP::Handler::Handler(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory, std::string &&authDNPattern, std::string &&headersAsAttributesPattern) - : _authDNRegex(std::move(authDNPattern)), - _headersAsAttributesRegex(std::move(headersAsAttributesPattern)), - logger_(logging::LoggerFactory<ListenHTTP::Handler>::getLogger()) { - _processContext = context; - _processSessionFactory = sessionFactory; -} - -void ListenHTTP::Handler::sendErrorResponse(struct mg_connection *conn) { - mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n" - "Content-Type: text/html\r\n" - "Content-Length: 0\r\n\r\n"); -} - -bool ListenHTTP::Handler::handlePost(CivetServer *server, struct mg_connection *conn) { - auto req_info = mg_get_request_info(conn); - logger_->log_info("ListenHTTP handling POST request of length %ll", req_info->content_length); - - // If this is a two-way TLS connection, authorize the peer against the configured pattern - if (req_info->is_ssl && req_info->client_cert != nullptr) { - if (!std::regex_match(req_info->client_cert->subject, _authDNRegex)) { - mg_printf(conn, "HTTP/1.1 403 Forbidden\r\n" - "Content-Type: text/html\r\n" - "Content-Length: 0\r\n\r\n"); - logger_->log_warn("ListenHTTP client DN not authorized: %s", req_info->client_cert->subject); - return true; - } - } - - // Always send 100 Continue, as allowed per standard to minimize client delay (https://www.w3.org/Protocols/rfc2616/rfc2616-sec8.html) - mg_printf(conn, "HTTP/1.1 100 Continue\r\n\r\n"); - - auto session = _processSessionFactory->createSession(); - ListenHTTP::WriteCallback callback(conn, req_info); - auto flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); - - if (!flowFile) { - sendErrorResponse(conn); - return true; - } - - try { - session->write(flowFile, &callback); - - // Add filename from "filename" header value (and pattern headers) - for (int i = 0; i < req_info->num_headers; i++) { - auto header = &req_info->http_headers[i]; - - if (strcmp("filename", header->name) == 0) { - if (!flowFile->updateAttribute("filename", header->value)) { - flowFile->addAttribute("filename", header->value); - } - } else if (std::regex_match(header->name, _headersAsAttributesRegex)) { - if (!flowFile->updateAttribute(header->name, header->value)) { - flowFile->addAttribute(header->name, header->value); - } - } - } - - session->transfer(flowFile, Success); - session->commit(); - } catch (std::exception &exception) { - logger_->log_debug("ListenHTTP Caught Exception %s", exception.what()); - sendErrorResponse(conn); - session->rollback(); - throw; - } catch (...) { - logger_->log_debug("ListenHTTP Caught Exception Processor::onTrigger"); - sendErrorResponse(conn); - session->rollback(); - throw; - } - - mg_printf(conn, "HTTP/1.1 200 OK\r\n" - "Content-Type: text/html\r\n" - "Content-Length: 0\r\n\r\n"); - - return true; -} - -ListenHTTP::WriteCallback::WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo) - : logger_(logging::LoggerFactory<ListenHTTP::WriteCallback>::getLogger()) { - _conn = conn; - _reqInfo = reqInfo; -} - -int64_t ListenHTTP::WriteCallback::process(std::shared_ptr<io::BaseStream> stream) { - int64_t rlen; - int64_t nlen = 0; - int64_t tlen = _reqInfo->content_length; - uint8_t buf[16384]; - - // if we have no content length we should call mg_read until - // there is no data left from the stream to be HTTP/1.1 compliant - while (tlen == -1 || nlen < tlen) { - rlen = tlen == -1 ? sizeof(buf) : tlen - nlen; - - if (rlen > (int64_t)sizeof(buf)) { - rlen = (int64_t)sizeof(buf); - } - - // Read a buffer of data from client - rlen = mg_read(_conn, &buf[0], (size_t) rlen); - - if (rlen <= 0) { - break; - } - - // Transfer buffer data to the output stream - stream->write(&buf[0], rlen); - - nlen += rlen; - } - - return nlen; -} - -} /* namespace processors */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/civetweb/RESTReceiver.cpp ---------------------------------------------------------------------- diff --git a/extensions/civetweb/RESTReceiver.cpp b/extensions/civetweb/RESTReceiver.cpp deleted file mode 100644 index 1f015ad..0000000 --- a/extensions/civetweb/RESTReceiver.cpp +++ /dev/null @@ -1,147 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "c2/protocols/RESTReceiver.h" -#include <algorithm> -#include <memory> -#include <utility> -#include <map> -#include <string> -#include <vector> - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace c2 { - -int log_message(const struct mg_connection *conn, const char *message) { - puts(message); - return 1; -} - -int ssl_protocol_en(void *ssl_context, void *user_data) { - return 0; -} - -RESTReceiver::RESTReceiver(std::string name, uuid_t uuid) - : HeartBeatReporter(name, uuid), - logger_(logging::LoggerFactory<RESTReceiver>::getLogger()) { -} - -void RESTReceiver::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) { - HeartBeatReporter::initialize(controller, configure); - logger_->log_debug("Initializing rest receiveer"); - if (nullptr != configuration_) { - std::string listeningPort, rootUri, caCert; - configuration_->get("c2.rest.listener.port", listeningPort); - configuration_->get("c2.rest.listener.heartbeat.rooturi", rootUri); - configuration_->get("c2.rest.listener.cacert", caCert); - - if (!listeningPort.empty() && !rootUri.empty()) { - handler = std::unique_ptr<ListeningProtocol>(new ListeningProtocol()); - if (!caCert.empty()) { - listener = start_webserver(listeningPort, rootUri, dynamic_cast<CivetHandler*>(handler.get()), caCert); - } else { - listener = start_webserver(listeningPort, rootUri, dynamic_cast<CivetHandler*>(handler.get())); - } - } - } -} -int16_t RESTReceiver::heartbeat(const C2Payload &payload) { - std::string operation_request_str = getOperation(payload); - std::string outputConfig; - Json::Value json_payload; - json_payload["operation"] = operation_request_str; - if (payload.getIdentifier().length() > 0) { - json_payload["operationid"] = payload.getIdentifier(); - } - const std::vector<C2ContentResponse> &content = payload.getContent(); - - for (const auto &payload_content : content) { - Json::Value payload_content_values; - bool use_sub_option = true; - if (payload_content.op == payload.getOperation()) { - for (auto content : payload_content.operation_arguments) { - if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) { - json_payload[payload_content.name] = content.second; - use_sub_option = false; - } else { - payload_content_values[content.first] = content.second; - } - } - } - if (use_sub_option) - json_payload[payload_content.name] = payload_content_values; - } - - for (const auto &nested_payload : payload.getNestedPayloads()) { - json_payload[nested_payload.getLabel()] = serializeJsonPayload(json_payload, nested_payload); - } - - Json::StyledWriter writer; - outputConfig = writer.write(json_payload); - if (handler != nullptr) { - logger_->log_debug("Setting %s", outputConfig); - handler->setResponse(outputConfig); - } - - return 0; -} - -std::unique_ptr<CivetServer> RESTReceiver::start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler, std::string &ca_cert) { - struct mg_callbacks callback; - - memset(&callback, 0, sizeof(callback)); - callback.init_ssl = ssl_protocol_en; - std::string my_port = port; - my_port += "s"; - callback.log_message = log_message; - const char *options[] = { "listening_ports", port.c_str(), "ssl_certificate", ca_cert.c_str(), "ssl_protocol_version", "0", "ssl_cipher_list", "ALL", - "ssl_verify_peer", "no", "num_threads", "1", 0 }; - - std::vector<std::string> cpp_options; - for (uint32_t i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { - cpp_options.push_back(options[i]); - } - std::unique_ptr<CivetServer> server = std::unique_ptr<CivetServer>(new CivetServer(cpp_options)); - - server->addHandler(rooturi, handler); - - return server; -} - -std::unique_ptr<CivetServer> RESTReceiver::start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler) { - const char *options[] = { "document_root", ".", "listening_ports", port.c_str(), "num_threads", "1", 0 }; - - std::vector<std::string> cpp_options; - for (uint32_t i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { - cpp_options.push_back(options[i]); - } - std::unique_ptr<CivetServer> server = std::unique_ptr<CivetServer>(new CivetServer(cpp_options)); - - server->addHandler(rooturi, handler); - - return server; -} - -} /* namespace c2 */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/civetweb/include/CivetLoader.h ---------------------------------------------------------------------- diff --git a/extensions/civetweb/include/CivetLoader.h b/extensions/civetweb/include/CivetLoader.h deleted file mode 100644 index f571b5d..0000000 --- a/extensions/civetweb/include/CivetLoader.h +++ /dev/null @@ -1,70 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef EXTENSION_CIVETLOADER_H -#define EXTENSION_CIVETLOADER_H - -#include "core/ClassLoader.h" -#include "c2/protocols/RESTReceiver.h" -#include "processors/ListenHTTP.h" - -class __attribute__((visibility("default"))) CivetFactory : public core::ObjectFactory { - public: - CivetFactory() { - - } - - /** - * Gets the name of the object. - * @return class name of processor - */ - virtual std::string getName() { - return "CivetFactory"; - } - - virtual std::string getClassName() { - return "CivetFactory"; - } - /** - * Gets the class name for the object - * @return class name for the processor. - */ - virtual std::vector<std::string> getClassNames() { - std::vector<std::string> class_names; - class_names.push_back("RESTReceiver"); - class_names.push_back("ListenHTTP"); - return class_names; - } - - virtual std::unique_ptr<ObjectFactory> assign(const std::string &class_name) { - if (utils::StringUtils::equalsIgnoreCase(class_name, "RESTReceiver")) { - return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::c2::RESTReceiver>()); - } else if (utils::StringUtils::equalsIgnoreCase(class_name, "ListenHTTP")) { - return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<processors::ListenHTTP>()); - } else { - return nullptr; - } - } - - static bool added; - -}; - -extern "C" { -void *createCivetFactory(void); -} -#endif /* EXTENSION_CIVETLOADER_H */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/civetweb/include/c2/protocols/RESTReceiver.h ---------------------------------------------------------------------- diff --git a/extensions/civetweb/include/c2/protocols/RESTReceiver.h b/extensions/civetweb/include/c2/protocols/RESTReceiver.h deleted file mode 100644 index 4793ee3..0000000 --- a/extensions/civetweb/include/c2/protocols/RESTReceiver.h +++ /dev/null @@ -1,110 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIBMINIFI_INCLUDE_C2_RESTRCVR_H_ -#define LIBMINIFI_INCLUDE_C2_RESTRCVR_H_ - -#include "json/json.h" -#include "json/writer.h" -#include <string> -#include <mutex> -#include "core/Resource.h" -#include "c2/protocols/RESTProtocol.h" -#include "CivetServer.h" -#include "c2/C2Protocol.h" -#include "controllers/SSLContextService.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace c2 { - -int log_message(const struct mg_connection *conn, const char *message); - -int ssl_protocol_en(void *ssl_context, void *user_data); - -/** - * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol. - * - * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST - * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction - * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively. - * - */ -class RESTReceiver : public RESTProtocol, public HeartBeatReporter { - public: - RESTReceiver(std::string name, uuid_t uuid = nullptr); - - virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) override; - virtual int16_t heartbeat(const C2Payload &heartbeat) override; - - protected: - - class ListeningProtocol : public CivetHandler { - - public: - ListeningProtocol() { - - } - - bool handleGet(CivetServer *server, struct mg_connection *conn) { - std::string currentvalue; - { - std::lock_guard<std::mutex> lock(reponse_mutex_); - currentvalue = resp_; - } - - std::stringstream output; - output << "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: " << currentvalue.length() << "\r\nConnection: close\r\n\r\n"; - - mg_printf(conn, "%s", output.str().c_str()); - mg_printf(conn, "%s", currentvalue.c_str()); - return true; - } - - void setResponse(std::string response) { - std::lock_guard<std::mutex> lock(reponse_mutex_); - resp_ = response; - } - - protected: - std::mutex reponse_mutex_; - std::string resp_; - - }; - - std::unique_ptr<CivetServer> start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler, std::string &ca_cert); - - std::unique_ptr<CivetServer> start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler); - - std::unique_ptr<CivetServer> listener; - std::unique_ptr<ListeningProtocol> handler; - - private: - std::shared_ptr<logging::Logger> logger_; -}; - -REGISTER_RESOURCE(RESTReceiver); - -} /* namesapce c2 */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ - -#endif /* LIBMINIFI_INCLUDE_C2_RESTRCVR_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/civetweb/include/io/CivetStream.h ---------------------------------------------------------------------- diff --git a/extensions/civetweb/include/io/CivetStream.h b/extensions/civetweb/include/io/CivetStream.h deleted file mode 100644 index 571b0ca..0000000 --- a/extensions/civetweb/include/io/CivetStream.h +++ /dev/null @@ -1,138 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef EXTENSIONS_HTTP_CURL_CLIENT_CIVETSTREAM_H_ -#define EXTENSIONS_HTTP_CURL_CLIENT_CIVETSTREAM_H_ - -#include <memory> -#include <thread> -#include <mutex> -#include <future> -#include <vector> - -#include "io/BaseStream.h" -#include "civetweb.h" -#include "CivetServer.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace io { - -class CivetStream : public io::BaseStream { - public: - /** - * File Stream constructor that accepts an fstream shared pointer. - * It must already be initialized for read and write. - */ - explicit CivetStream(struct mg_connection *conn) - : io::BaseStream(), conn(conn) { - - } - - virtual ~CivetStream() { - } - /** - * Skip to the specified offset. - * @param offset offset to which we will skip - */ - void seek(uint64_t offset){ - - } - - const uint64_t getSize() const { - return BaseStream::readBuffer; - } - - // data stream extensions - /** - * Reads data and places it into buf - * @param buf buffer in which we extract data - * @param buflen - */ - virtual int readData(std::vector<uint8_t> &buf, int buflen) { - if (buf.capacity() < buflen) { - buf.resize(buflen); - } - int ret = readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen); - - if (ret < buflen) { - buf.resize(ret); - } - return ret; - } - - /** - * Reads data and places it into buf - * @param buf buffer in which we extract data - * @param buflen - */ - virtual int readData(uint8_t *buf, int buflen) { - return mg_read(conn,buf,buflen); - } - - /** - * Write value to the stream using std::vector - * @param buf incoming buffer - * @param buflen buffer to write - * - */ - virtual int writeData(std::vector<uint8_t> &buf, int buflen) { - return 0; - } - - /** - * writes value to stream - * @param value value to write - * @param size size of value - */ - virtual int writeData(uint8_t *value, int size) { - return 0; - } - - protected: - - /** - * Creates a vector and returns the vector using the provided - * type name. - * @param t incoming object - * @returns vector. - */ - template<typename T> - inline std::vector<uint8_t> readBuffer(const T& t) { - std::vector<uint8_t> buf; - buf.resize(sizeof t); - readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t)); - return buf; - } - - void reset(); - - //size_t pos; - struct mg_connection *conn; - - private: - - std::shared_ptr<logging::Logger> logger_; -}; -} /* namespace io */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ - -#endif /* EXTENSIONS_HTTP_CURL_CLIENT_CIVETSTREAM_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/civetweb/include/processors/ListenHTTP.h ---------------------------------------------------------------------- diff --git a/extensions/civetweb/include/processors/ListenHTTP.h b/extensions/civetweb/include/processors/ListenHTTP.h deleted file mode 100644 index 1b58dcd..0000000 --- a/extensions/civetweb/include/processors/ListenHTTP.h +++ /dev/null @@ -1,121 +0,0 @@ -/** - * @file ListenHTTP.h - * ListenHTTP class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __LISTEN_HTTP_H__ -#define __LISTEN_HTTP_H__ - -#include <memory> -#include <regex> - -#include <CivetServer.h> - -#include "FlowFileRecord.h" -#include "core/Processor.h" -#include "core/ProcessSession.h" -#include "core/Core.h" -#include "core/Resource.h" -#include "core/logging/LoggerConfiguration.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace processors { - -// ListenHTTP Class -class ListenHTTP : public core::Processor { - public: - - // Constructor - /*! - * Create a new processor - */ - ListenHTTP(std::string name, uuid_t uuid = NULL) - : Processor(name, uuid), - logger_(logging::LoggerFactory<ListenHTTP>::getLogger()) { - } - // Destructor - virtual ~ListenHTTP(); - // Processor Name - static constexpr char const* ProcessorName = "ListenHTTP"; - // Supported Properties - static core::Property BasePath; - static core::Property Port; - static core::Property AuthorizedDNPattern; - static core::Property SSLCertificate; - static core::Property SSLCertificateAuthority; - static core::Property SSLVerifyPeer; - static core::Property SSLMinimumVersion; - static core::Property HeadersAsAttributesRegex; - // Supported Relationships - static core::Relationship Success; - - void onTrigger(core::ProcessContext *context, core::ProcessSession *session); - void initialize(); - void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); - - // HTTP request handler - class Handler : public CivetHandler { - public: - Handler(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory, std::string &&authDNPattern, std::string &&headersAsAttributesPattern);bool handlePost( - CivetServer *server, struct mg_connection *conn); - - private: - // Send HTTP 500 error response to client - void sendErrorResponse(struct mg_connection *conn); - // Logger - std::shared_ptr<logging::Logger> logger_; - - std::regex _authDNRegex; - std::regex _headersAsAttributesRegex; - core::ProcessContext *_processContext; - core::ProcessSessionFactory *_processSessionFactory; - }; - - // Write callback for transferring data from HTTP request to content repo - class WriteCallback : public OutputStreamCallback { - public: - WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo); - int64_t process(std::shared_ptr<io::BaseStream> stream); - - private: - // Logger - std::shared_ptr<logging::Logger> logger_; - - struct mg_connection *_conn; - const struct mg_request_info *_reqInfo; - }; - - private: - // Logger - std::shared_ptr<logging::Logger> logger_; - - std::unique_ptr<CivetServer> _server; - std::unique_ptr<Handler> _handler; -}; - -REGISTER_RESOURCE(ListenHTTP); - -} /* namespace processors */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/civetweb/processors/ListenHTTP.cpp ---------------------------------------------------------------------- diff --git a/extensions/civetweb/processors/ListenHTTP.cpp b/extensions/civetweb/processors/ListenHTTP.cpp new file mode 100644 index 0000000..73ade40 --- /dev/null +++ b/extensions/civetweb/processors/ListenHTTP.cpp @@ -0,0 +1,333 @@ +/** + * @file ListenHTTP.cpp + + * ListenHTTP class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "ListenHTTP.h" +#include <uuid/uuid.h> +#include <CivetServer.h> +#include <stdio.h> +#include <sstream> +#include <utility> +#include <memory> +#include <string> +#include <iostream> +#include <fstream> +#include <set> +#include <vector> +#include "utils/TimeUtil.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/ProcessSessionFactory.h" +#include "core/logging/LoggerConfiguration.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property ListenHTTP::BasePath("Base Path", "Base path for incoming connections", "contentListener"); +core::Property ListenHTTP::Port("Listening Port", "The Port to listen on for incoming connections", ""); +core::Property ListenHTTP::AuthorizedDNPattern("Authorized DN Pattern", "A Regular Expression to apply against the Distinguished Name of incoming" + " connections. If the Pattern does not match the DN, the connection will be refused.", + ".*"); +core::Property ListenHTTP::SSLCertificate("SSL Certificate", "File containing PEM-formatted file including TLS/SSL certificate and key", ""); +core::Property ListenHTTP::SSLCertificateAuthority("SSL Certificate Authority", "File containing trusted PEM-formatted certificates", ""); +core::Property ListenHTTP::SSLVerifyPeer("SSL Verify Peer", "Whether or not to verify the client's certificate (yes/no)", "no"); +core::Property ListenHTTP::SSLMinimumVersion("SSL Minimum Version", "Minimum TLS/SSL version allowed (SSL2, SSL3, TLS1.0, TLS1.1, TLS1.2)", "SSL2"); +core::Property ListenHTTP::HeadersAsAttributesRegex("HTTP Headers to receive as Attributes (Regex)", "Specifies the Regular Expression that determines the names of HTTP Headers that" + " should be passed along as FlowFile attributes", + ""); + +core::Relationship ListenHTTP::Success("success", "All files are routed to success"); + +void ListenHTTP::initialize() { + logger_->log_info("Initializing ListenHTTP"); + + // Set the supported properties + std::set<core::Property> properties; + properties.insert(BasePath); + properties.insert(Port); + properties.insert(AuthorizedDNPattern); + properties.insert(SSLCertificate); + properties.insert(SSLCertificateAuthority); + properties.insert(SSLVerifyPeer); + properties.insert(SSLMinimumVersion); + properties.insert(HeadersAsAttributesRegex); + setSupportedProperties(properties); + // Set the supported relationships + std::set<core::Relationship> relationships; + relationships.insert(Success); + setSupportedRelationships(relationships); +} + +void ListenHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { + std::string basePath; + + if (!context->getProperty(BasePath.getName(), basePath)) { + logger_->log_info("%s attribute is missing, so default value of %s will be used", BasePath.getName().c_str(), BasePath.getValue().c_str()); + basePath = BasePath.getValue(); + } + + basePath.insert(0, "/"); + + std::string listeningPort; + + if (!context->getProperty(Port.getName(), listeningPort)) { + logger_->log_error("%s attribute is missing or invalid", Port.getName().c_str()); + return; + } + + std::string authDNPattern; + + if (context->getProperty(AuthorizedDNPattern.getName(), authDNPattern) && !authDNPattern.empty()) { + logger_->log_info("ListenHTTP using %s: %s", AuthorizedDNPattern.getName().c_str(), authDNPattern.c_str()); + } + + std::string sslCertFile; + + if (context->getProperty(SSLCertificate.getName(), sslCertFile) && !sslCertFile.empty()) { + logger_->log_info("ListenHTTP using %s: %s", SSLCertificate.getName().c_str(), sslCertFile.c_str()); + } + + // Read further TLS/SSL options only if TLS/SSL usage is implied by virtue of certificate value being set + std::string sslCertAuthorityFile; + std::string sslVerifyPeer; + std::string sslMinVer; + + if (!sslCertFile.empty()) { + if (context->getProperty(SSLCertificateAuthority.getName(), sslCertAuthorityFile) && !sslCertAuthorityFile.empty()) { + logger_->log_info("ListenHTTP using %s: %s", SSLCertificateAuthority.getName().c_str(), sslCertAuthorityFile.c_str()); + } + + if (context->getProperty(SSLVerifyPeer.getName(), sslVerifyPeer)) { + if (sslVerifyPeer.empty() || sslVerifyPeer.compare("no") == 0) { + logger_->log_info("ListenHTTP will not verify peers"); + } else { + logger_->log_info("ListenHTTP will verify peers"); + } + } else { + logger_->log_info("ListenHTTP will not verify peers"); + } + + if (context->getProperty(SSLMinimumVersion.getName(), sslMinVer)) { + logger_->log_info("ListenHTTP using %s: %s", SSLMinimumVersion.getName().c_str(), sslMinVer.c_str()); + } + } + + std::string headersAsAttributesPattern; + + if (context->getProperty(HeadersAsAttributesRegex.getName(), headersAsAttributesPattern) && !headersAsAttributesPattern.empty()) { + logger_->log_info("ListenHTTP using %s: %s", HeadersAsAttributesRegex.getName().c_str(), headersAsAttributesPattern.c_str()); + } + + auto numThreads = getMaxConcurrentTasks(); + + logger_->log_info("ListenHTTP starting HTTP server on port %s and path %s with %d threads", listeningPort.c_str(), basePath.c_str(), numThreads); + + // Initialize web server + std::vector<std::string> options; + options.push_back("enable_keep_alive"); + options.push_back("yes"); + options.push_back("keep_alive_timeout_ms"); + options.push_back("15000"); + options.push_back("num_threads"); + options.push_back(std::to_string(numThreads)); + + if (sslCertFile.empty()) { + options.push_back("listening_ports"); + options.push_back(listeningPort); + } else { + listeningPort += "s"; + options.push_back("listening_ports"); + options.push_back(listeningPort); + + options.push_back("ssl_certificate"); + options.push_back(sslCertFile); + + if (!sslCertAuthorityFile.empty()) { + options.push_back("ssl_ca_file"); + options.push_back(sslCertAuthorityFile); + } + + if (sslVerifyPeer.empty() || sslVerifyPeer.compare("no") == 0) { + options.push_back("ssl_verify_peer"); + options.push_back("no"); + } else { + options.push_back("ssl_verify_peer"); + options.push_back("yes"); + } + + if (sslMinVer.compare("SSL2") == 0) { + options.push_back("ssl_protocol_version"); + options.push_back(std::to_string(0)); + } else if (sslMinVer.compare("SSL3") == 0) { + options.push_back("ssl_protocol_version"); + options.push_back(std::to_string(1)); + } else if (sslMinVer.compare("TLS1.0") == 0) { + options.push_back("ssl_protocol_version"); + options.push_back(std::to_string(2)); + } else if (sslMinVer.compare("TLS1.1") == 0) { + options.push_back("ssl_protocol_version"); + options.push_back(std::to_string(3)); + } else { + options.push_back("ssl_protocol_version"); + options.push_back(std::to_string(4)); + } + } + + _server.reset(new CivetServer(options)); + _handler.reset(new Handler(context, sessionFactory, std::move(authDNPattern), std::move(headersAsAttributesPattern))); + _server->addHandler(basePath, _handler.get()); +} + +ListenHTTP::~ListenHTTP() { +} + +void ListenHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get()); + + // Do nothing if there are no incoming files + if (!flowFile) { + return; + } +} + +ListenHTTP::Handler::Handler(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory, std::string &&authDNPattern, std::string &&headersAsAttributesPattern) + : _authDNRegex(std::move(authDNPattern)), + _headersAsAttributesRegex(std::move(headersAsAttributesPattern)), + logger_(logging::LoggerFactory<ListenHTTP::Handler>::getLogger()) { + _processContext = context; + _processSessionFactory = sessionFactory; +} + +void ListenHTTP::Handler::sendErrorResponse(struct mg_connection *conn) { + mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n" + "Content-Type: text/html\r\n" + "Content-Length: 0\r\n\r\n"); +} + +bool ListenHTTP::Handler::handlePost(CivetServer *server, struct mg_connection *conn) { + auto req_info = mg_get_request_info(conn); + logger_->log_info("ListenHTTP handling POST request of length %ll", req_info->content_length); + + // If this is a two-way TLS connection, authorize the peer against the configured pattern + if (req_info->is_ssl && req_info->client_cert != nullptr) { + if (!std::regex_match(req_info->client_cert->subject, _authDNRegex)) { + mg_printf(conn, "HTTP/1.1 403 Forbidden\r\n" + "Content-Type: text/html\r\n" + "Content-Length: 0\r\n\r\n"); + logger_->log_warn("ListenHTTP client DN not authorized: %s", req_info->client_cert->subject); + return true; + } + } + + // Always send 100 Continue, as allowed per standard to minimize client delay (https://www.w3.org/Protocols/rfc2616/rfc2616-sec8.html) + mg_printf(conn, "HTTP/1.1 100 Continue\r\n\r\n"); + + auto session = _processSessionFactory->createSession(); + ListenHTTP::WriteCallback callback(conn, req_info); + auto flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); + + if (!flowFile) { + sendErrorResponse(conn); + return true; + } + + try { + session->write(flowFile, &callback); + + // Add filename from "filename" header value (and pattern headers) + for (int i = 0; i < req_info->num_headers; i++) { + auto header = &req_info->http_headers[i]; + + if (strcmp("filename", header->name) == 0) { + if (!flowFile->updateAttribute("filename", header->value)) { + flowFile->addAttribute("filename", header->value); + } + } else if (std::regex_match(header->name, _headersAsAttributesRegex)) { + if (!flowFile->updateAttribute(header->name, header->value)) { + flowFile->addAttribute(header->name, header->value); + } + } + } + + session->transfer(flowFile, Success); + session->commit(); + } catch (std::exception &exception) { + logger_->log_debug("ListenHTTP Caught Exception %s", exception.what()); + sendErrorResponse(conn); + session->rollback(); + throw; + } catch (...) { + logger_->log_debug("ListenHTTP Caught Exception Processor::onTrigger"); + sendErrorResponse(conn); + session->rollback(); + throw; + } + + mg_printf(conn, "HTTP/1.1 200 OK\r\n" + "Content-Type: text/html\r\n" + "Content-Length: 0\r\n\r\n"); + + return true; +} + +ListenHTTP::WriteCallback::WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo) + : logger_(logging::LoggerFactory<ListenHTTP::WriteCallback>::getLogger()) { + _conn = conn; + _reqInfo = reqInfo; +} + +int64_t ListenHTTP::WriteCallback::process(std::shared_ptr<io::BaseStream> stream) { + int64_t rlen; + int64_t nlen = 0; + int64_t tlen = _reqInfo->content_length; + uint8_t buf[16384]; + + // if we have no content length we should call mg_read until + // there is no data left from the stream to be HTTP/1.1 compliant + while (tlen == -1 || nlen < tlen) { + rlen = tlen == -1 ? sizeof(buf) : tlen - nlen; + + if (rlen > (int64_t)sizeof(buf)) { + rlen = (int64_t)sizeof(buf); + } + + // Read a buffer of data from client + rlen = mg_read(_conn, &buf[0], (size_t) rlen); + + if (rlen <= 0) { + break; + } + + // Transfer buffer data to the output stream + stream->write(&buf[0], rlen); + + nlen += rlen; + } + + return nlen; +} + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/civetweb/processors/ListenHTTP.h ---------------------------------------------------------------------- diff --git a/extensions/civetweb/processors/ListenHTTP.h b/extensions/civetweb/processors/ListenHTTP.h new file mode 100644 index 0000000..5199d19 --- /dev/null +++ b/extensions/civetweb/processors/ListenHTTP.h @@ -0,0 +1,122 @@ +/** + * @file ListenHTTP.h + * ListenHTTP class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __LISTEN_HTTP_H__ +#define __LISTEN_HTTP_H__ + +#include <memory> +#include <regex> + +#include <CivetServer.h> + +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/logging/LoggerConfiguration.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// ListenHTTP Class +class ListenHTTP : public core::Processor { + public: + + // Constructor + /*! + * Create a new processor + */ + ListenHTTP(std::string name, uuid_t uuid = NULL) + : Processor(name, uuid), + logger_(logging::LoggerFactory<ListenHTTP>::getLogger()) { + } + // Destructor + virtual ~ListenHTTP(); + // Processor Name + static constexpr char const* ProcessorName = "ListenHTTP"; + // Supported Properties + static core::Property BasePath; + static core::Property Port; + static core::Property AuthorizedDNPattern; + static core::Property SSLCertificate; + static core::Property SSLCertificateAuthority; + static core::Property SSLVerifyPeer; + static core::Property SSLMinimumVersion; + static core::Property HeadersAsAttributesRegex; + // Supported Relationships + static core::Relationship Success; + + void onTrigger(core::ProcessContext *context, core::ProcessSession *session); + void initialize(); + void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); + + // HTTP request handler + class Handler : public CivetHandler { + public: + Handler(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory, std::string &&authDNPattern, std::string &&headersAsAttributesPattern); + bool handlePost(CivetServer *server, struct mg_connection *conn); + + private: + // Send HTTP 500 error response to client + void sendErrorResponse(struct mg_connection *conn); + + std::regex _authDNRegex; + std::regex _headersAsAttributesRegex; + core::ProcessContext *_processContext; + core::ProcessSessionFactory *_processSessionFactory; + + // Logger + std::shared_ptr<logging::Logger> logger_; + }; + + // Write callback for transferring data from HTTP request to content repo + class WriteCallback : public OutputStreamCallback { + public: + WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo); + int64_t process(std::shared_ptr<io::BaseStream> stream); + + private: + // Logger + std::shared_ptr<logging::Logger> logger_; + + struct mg_connection *_conn; + const struct mg_request_info *_reqInfo; + }; + + private: + // Logger + std::shared_ptr<logging::Logger> logger_; + + std::unique_ptr<CivetServer> _server; + std::unique_ptr<Handler> _handler; +}; + +REGISTER_RESOURCE(ListenHTTP); + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/extensions/http-curl/CMakeLists.txt b/extensions/http-curl/CMakeLists.txt index b855e8e..dfa4259 100644 --- a/extensions/http-curl/CMakeLists.txt +++ b/extensions/http-curl/CMakeLists.txt @@ -22,7 +22,7 @@ find_package(CURL REQUIRED) set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols") set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols") -include_directories(../../libminifi/include ../../libminifi/include/core/yaml ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ${CIVET_THIRDPARTY_ROOT}/include ../../thirdparty/jsoncpp/include ../../thirdparty/) +include_directories(../../libminifi/include ../../libminifi/include/core/yaml ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ${CIVET_THIRDPARTY_ROOT} ../../thirdparty/jsoncpp/include ../../thirdparty/) include_directories(protocols client processors sitetosite) file(GLOB SOURCES "*.cpp" "protocols/*.cpp" "client/*.cpp" "processors/*.cpp" "sitetosite/*.cpp") @@ -49,6 +49,7 @@ find_package(UUID REQUIRED) find_package(OpenSSL REQUIRED) include_directories(${OPENSSL_INCLUDE_DIR}) target_link_libraries(minifi-http-curl ${CMAKE_DL_LIBS} ${OPENSSL_LIBRARIES}) +target_link_libraries(minifi-http-curl minifi-civet-extensions) find_package(ZLIB REQUIRED) include_directories(${ZLIB_INCLUDE_DIRS}) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/HTTPCurlLoader.h ---------------------------------------------------------------------- diff --git a/extensions/http-curl/HTTPCurlLoader.h b/extensions/http-curl/HTTPCurlLoader.h index 9a6e821..ec90e99 100644 --- a/extensions/http-curl/HTTPCurlLoader.h +++ b/extensions/http-curl/HTTPCurlLoader.h @@ -19,7 +19,7 @@ #define EXTENSIONS_HTTPCURLLOADER_H_ #include "c2/protocols/RESTProtocol.h" -#include "c2/protocols/RESTSender.h" +#include "protocols/RESTSender.h" #include "processors/InvokeHTTP.h" #include "client/HTTPClient.h" #include "core/ClassLoader.h"
