Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 3e23e20fe -> b8e45cbf9


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/HTTPIntegrationBase.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/tests/HTTPIntegrationBase.h 
b/extensions/http-curl/tests/HTTPIntegrationBase.h
new file mode 100644
index 0000000..611c11f
--- /dev/null
+++ b/extensions/http-curl/tests/HTTPIntegrationBase.h
@@ -0,0 +1,75 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_TEST_INTEGRATION_HTTPINTEGRATIONBASE_H_
+#define LIBMINIFI_TEST_INTEGRATION_HTTPINTEGRATIONBASE_H_
+
+#include "../tests/TestServer.h"
+#include "CivetServer.h"
+#include "integration/IntegrationBase.h"
+
+int log_message(const struct mg_connection *conn, const char *message) {
+  puts(message);
+  return 1;
+}
+
+int ssl_enable(void *ssl_context, void *user_data) {
+  struct ssl_ctx_st *ctx = (struct ssl_ctx_st *) ssl_context;
+  return 0;
+}
+
+class HTTPIntegrationBase : public IntegrationBase {
+ public:
+  HTTPIntegrationBase() : IntegrationBase(), server(nullptr) {}
+
+  void setUrl(std::string url, CivetHandler *handler);
+
+  virtual ~HTTPIntegrationBase();
+
+ protected:
+  CivetServer *server;
+};
+
+HTTPIntegrationBase::~HTTPIntegrationBase() {
+  stop_webserver(server);
+}
+
+void HTTPIntegrationBase::setUrl(std::string url, CivetHandler *handler) {
+
+  parse_http_components(url, port, scheme, path);
+  struct mg_callbacks callback;
+  if (url.find("localhost") != std::string::npos) {
+
+    if (server != nullptr){
+      server->addHandler(path,handler);
+      return;
+    }
+    if (scheme == "https" && !key_dir.empty()) {
+      std::string cert = "";
+      cert = key_dir + "nifi-cert.pem";
+      memset(&callback, 0, sizeof(callback));
+      callback.init_ssl = ssl_enable;
+      port += "s";
+      callback.log_message = log_message;
+      server = start_webserver(port, path, handler, &callback, cert, cert);
+    } else {
+      server = start_webserver(port, path, handler);
+    }
+  }
+}
+
+#endif /* LIBMINIFI_TEST_INTEGRATION_HTTPINTEGRATIONBASE_H_ */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp 
b/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
new file mode 100644
index 0000000..309492e
--- /dev/null
+++ b/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
@@ -0,0 +1,262 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CURLOPT_SSL_VERIFYPEER_DISABLE 1
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include "HTTPClient.h"
+#include "CivetServer.h"
+#include "sitetosite/HTTPProtocol.h"
+#include "InvokeHTTP.h"
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "io/StreamFactory.h"
+#include "RemoteProcessorGroupPort.h"
+#include "core/ConfigurableComponent.h"
+#include "TestServer.h"
+#include "HTTPIntegrationBase.h"
+#include "HTTPHandlers.h"
+#include "client/HTTPStream.h"
+
+class SiteToSiteTestHarness : public HTTPIntegrationBase {
+ public:
+  explicit SiteToSiteTestHarness(bool isSecure)
+      : isSecure(isSecure) {
+    char format[] = "/tmp/ssth.XXXXXX";
+    dir = testController.createTempDirectory(format);
+  }
+
+  void testSetup() {
+    
LogTestController::getInstance().setDebug<minifi::RemoteProcessorGroupPort>();
+    
LogTestController::getInstance().setDebug<minifi::sitetosite::HttpSiteToSiteClient>();
+    
LogTestController::getInstance().setDebug<minifi::sitetosite::SiteToSiteClient>();
+    LogTestController::getInstance().setDebug<utils::HTTPClient>();
+    
LogTestController::getInstance().setTrace<minifi::controllers::SSLContextService>();
+    LogTestController::getInstance().setInfo<minifi::FlowController>();
+    LogTestController::getInstance().setDebug<core::ConfigurableComponent>();
+
+    std::fstream file;
+    ss << dir << "/" << "tstFile.ext";
+    file.open(ss.str(), std::ios::out);
+    file << "tempFile";
+    file.close();
+
+    configuration->set("nifi.c2.enable", "false");
+    configuration->set("nifi.remote.input.http.enabled", "true");
+    configuration->set("nifi.remote.input.socket.port", "8082");
+  }
+
+  virtual void waitToVerifyProcessor() {
+    std::this_thread::sleep_for(std::chrono::seconds(3));
+  }
+
+  void cleanup() {
+    unlink(ss.str().c_str());
+  }
+
+  void runAssertions() {
+  }
+
+ protected:
+  bool isSecure;
+  char *dir;
+  std::stringstream ss;
+  TestController testController;
+};
+
+struct test_profile {
+  test_profile()
+      : flow_url_broken(false),
+        transaction_url_broken(false),
+        empty_transaction_url(false),
+        no_delete(false),
+        invalid_checksum(false) {
+  }
+
+  bool allFalse() const {
+    return !flow_url_broken && !transaction_url_broken && 
!empty_transaction_url && !no_delete && !invalid_checksum;
+  }
+  // tests for a broken flow file url
+  bool flow_url_broken;
+  // transaction url will return incorrect information
+  bool transaction_url_broken;
+  // Location will be absent within the
+  bool empty_transaction_url;
+  // delete url is not supported.
+  bool no_delete;
+  // invalid checksum error
+  bool invalid_checksum;
+};
+
+void run_variance(std::string test_file_location, bool isSecure, std::string 
url, const struct test_profile &profile) {
+  SiteToSiteTestHarness harness(isSecure);
+
+  SiteToSiteLocationResponder responder(isSecure);
+
+  TransactionResponder transaction_response(url, 
"471deef6-2a6e-4a7d-912a-81cc17e3a204", true, profile.transaction_url_broken, 
profile.empty_transaction_url);
+
+  std::string transaction_id = transaction_response.getTransactionId();
+
+  harness.setKeyDir("");
+
+  std::string controller_loc = url + "/controller";
+
+  harness.setUrl(controller_loc, &responder);
+
+  std::string transaction_url = url + 
"/data-transfer/input-ports/471deef6-2a6e-4a7d-912a-81cc17e3a204/transactions";
+  std::string action_url = url + 
"/site-to-site/input-ports/471deef6-2a6e-4a7d-912a-81cc17e3a204/transactions";
+
+  std::string transaction_output_url = url + 
"/data-transfer/output-ports/471deef6-2a6e-4a7d-912a-81cc17e3a203/transactions";
+  std::string action_output_url = url + 
"/site-to-site/output-ports/471deef6-2a6e-4a7d-912a-81cc17e3a203/transactions";
+
+  harness.setUrl(transaction_url, &transaction_response);
+
+  std::string peer_url = url + "/site-to-site/peers";
+
+  PeerResponder peer_response(url);
+
+  harness.setUrl(peer_url, &peer_response);
+
+  std::string flow_url = action_url + "/" + transaction_id + "/flow-files";
+
+  FlowFileResponder flowResponder(true, profile.flow_url_broken, 
profile.invalid_checksum);
+  flowResponder.setFlowUrl(flow_url);
+  auto producedFlows = flowResponder.getFlows();
+
+  TransactionResponder transaction_response_output(url, 
"471deef6-2a6e-4a7d-912a-81cc17e3a203", false, profile.transaction_url_broken, 
profile.empty_transaction_url);
+  std::string transaction_output_id = 
transaction_response_output.getTransactionId();
+  transaction_response_output.setFeed(producedFlows);
+
+  harness.setUrl(transaction_output_url, &transaction_response_output);
+
+  std::string flow_output_url = action_output_url + "/" + 
transaction_output_id + "/flow-files";
+
+  FlowFileResponder flowOutputResponder(false, profile.flow_url_broken, 
profile.invalid_checksum);
+  flowOutputResponder.setFlowUrl(flow_output_url);
+  flowOutputResponder.setFeed(producedFlows);
+
+  harness.setUrl(flow_url, &flowResponder);
+  harness.setUrl(flow_output_url, &flowOutputResponder);
+
+  if (!profile.no_delete) {
+    std::string delete_url = transaction_url + "/" + transaction_id;
+    DeleteTransactionResponder deleteResponse(delete_url, "201 OK", 12);
+    harness.setUrl(delete_url, &deleteResponse);
+
+    std::string delete_output_url = transaction_output_url + "/" + 
transaction_output_id;
+    DeleteTransactionResponder deleteOutputResponse(delete_output_url, "201 
OK", producedFlows);
+    harness.setUrl(delete_output_url, &deleteOutputResponse);
+  }
+
+  harness.run(test_file_location);
+
+  std::stringstream assertStr;
+  if (profile.allFalse()) {
+    assertStr << "Site2Site transaction " << transaction_id << " peer finished 
transaction";
+    assert(LogTestController::getInstance().contains(assertStr.str()) == true);
+  } else if (profile.empty_transaction_url) {
+    assert(LogTestController::getInstance().contains("Location is empty") == 
true);
+  } else if (profile.transaction_url_broken) {
+    assert(LogTestController::getInstance().contains("Could not create 
transaction, intent is ohstuff") == true);
+  } else if (profile.invalid_checksum) {
+    assertStr << "Site2Site transaction " << transaction_id << " peer confirm 
transaction with CRC Imawrongchecksumshortandstout";
+    assert(LogTestController::getInstance().contains(assertStr.str()) == true);
+    assertStr.str(std::string());
+    assertStr << "Site2Site transaction " << transaction_id << " CRC not 
matched";
+    assert(LogTestController::getInstance().contains(assertStr.str()) == true);
+    assertStr.str(std::string());
+    assertStr << "Site2Site delete transaction " << transaction_id;
+    assert(LogTestController::getInstance().contains(assertStr.str()) == true);
+  } else if (profile.no_delete) {
+    assert(LogTestController::getInstance().contains("Received 401 response 
code from delete") == true);
+  } else {
+    assertStr << "Site2Site transaction " << transaction_id << " peer unknown 
respond code 254";
+    assert(LogTestController::getInstance().contains(assertStr.str()) == true);
+  }
+  LogTestController::getInstance().reset();
+}
+
+int main(int argc, char **argv) {
+  transaction_id = 0;
+  transaction_id_output = 0;
+  std::string key_dir, test_file_location, url;
+  if (argc > 1) {
+    test_file_location = argv[1];
+    key_dir = argv[2];
+    url = argv[3];
+  }
+
+  bool isSecure = false;
+  if (url.find("https") != std::string::npos) {
+    isSecure = true;
+  }
+
+  {
+    struct test_profile profile;
+    run_variance(test_file_location, isSecure, url, profile);
+  }
+
+  {
+    struct test_profile profile;
+    profile.flow_url_broken = true;
+    run_variance(test_file_location, isSecure, url, profile);
+  }
+
+  {
+    struct test_profile profile;
+    profile.empty_transaction_url = true;
+    run_variance(test_file_location, isSecure, url, profile);
+  }
+
+  {
+    struct test_profile profile;
+    profile.transaction_url_broken = true;
+    run_variance(test_file_location, isSecure, url, profile);
+  }
+
+  {
+    struct test_profile profile;
+    profile.no_delete = true;
+    run_variance(test_file_location, isSecure, url, profile);
+  }
+
+  {
+    struct test_profile profile;
+    profile.invalid_checksum = true;
+    run_variance(test_file_location, isSecure, url, profile);
+  }
+
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/HttpGetIntegrationTest.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/tests/HttpGetIntegrationTest.cpp 
b/extensions/http-curl/tests/HttpGetIntegrationTest.cpp
new file mode 100644
index 0000000..df40497
--- /dev/null
+++ b/extensions/http-curl/tests/HttpGetIntegrationTest.cpp
@@ -0,0 +1,162 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CURLOPT_SSL_VERIFYPEER_DISABLE 1
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include "HTTPClient.h"
+#include "InvokeHTTP.h"
+#include "TestServer.h"
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "processors/InvokeHTTP.h"
+#include "processors/ListenHTTP.h"
+#include "processors/LogAttribute.h"
+
+void waitToVerifyProcessor() {
+  std::this_thread::sleep_for(std::chrono::seconds(10));
+}
+
+int log_message(const struct mg_connection *conn, const char *message) {
+  puts(message);
+  return 1;
+}
+
+int ssl_enable(void *ssl_context, void *user_data) {
+  struct ssl_ctx_st *ctx = (struct ssl_ctx_st *) ssl_context;
+  return 0;
+}
+
+class HttpResponder : public CivetHandler {
+ public:
+  bool handleGet(CivetServer *server, struct mg_connection *conn) {
+    static const std::string site2site_rest_resp = "hi this is a get test";
+    mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+              "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+              site2site_rest_resp.length());
+    mg_printf(conn, "%s", site2site_rest_resp.c_str());
+    return true;
+  }
+};
+
+int main(int argc, char **argv) {
+  init_webserver();
+  LogTestController::getInstance().setDebug<core::Processor>();
+  LogTestController::getInstance().setDebug<core::ProcessSession>();
+  LogTestController::getInstance().setDebug<utils::HTTPClient>();
+  
LogTestController::getInstance().setDebug<minifi::controllers::SSLContextService>();
+  LogTestController::getInstance().setDebug<minifi::processors::InvokeHTTP>();
+  
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+  std::string key_dir, test_file_location;
+  if (argc > 1) {
+    test_file_location = argv[1];
+    key_dir = argv[2];
+  }
+  std::shared_ptr<minifi::Configure> configuration = 
std::make_shared<minifi::Configure>();
+  configuration->set(minifi::Configure::nifi_default_directory, key_dir);
+  mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
+
+  std::shared_ptr<core::Repository> test_repo = 
std::make_shared<TestRepository>();
+  std::shared_ptr<core::Repository> test_flow_repo = 
std::make_shared<TestFlowRepository>();
+
+  configuration->set(minifi::Configure::nifi_flow_configuration_file, 
test_file_location);
+
+  std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
std::make_shared<minifi::io::StreamFactory>(configuration);
+
+  std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
+
+  content_repo->initialize(configuration);
+
+  std::unique_ptr<core::FlowConfiguration> yaml_ptr = 
std::unique_ptr<core::YamlConfiguration>(
+      new core::YamlConfiguration(test_repo, test_repo, content_repo, 
stream_factory, configuration, test_file_location));
+  std::shared_ptr<TestRepository> repo = 
std::static_pointer_cast<TestRepository>(test_repo);
+
+  std::shared_ptr<minifi::FlowController> controller = 
std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, 
configuration, std::move(yaml_ptr),
+                                                                               
                 content_repo,
+                                                                               
                 DEFAULT_ROOT_GROUP_NAME,
+                                                                               
                 true);
+
+  core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, 
stream_factory, configuration, test_file_location);
+
+  std::unique_ptr<core::ProcessGroup> ptr = 
yaml_config.getRoot(test_file_location);
+  std::shared_ptr<core::ProcessGroup> pg = 
std::shared_ptr<core::ProcessGroup>(ptr.get());
+  std::shared_ptr<core::Processor> proc = ptr->findProcessor("invoke");
+  assert(proc != nullptr);
+
+  std::shared_ptr<minifi::processors::InvokeHTTP> inv = 
std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc);
+
+  assert(inv != nullptr);
+  std::string url = "";
+  inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
+  ptr.release();
+  HttpResponder h_ex;
+  std::string port, scheme, path;
+  CivetServer *server = nullptr;
+
+  parse_http_components(url, port, scheme, path);
+  struct mg_callbacks callback;
+  if (url.find("localhost") != std::string::npos) {
+    if (scheme == "https") {
+      std::string cert = "";
+      cert = key_dir + "nifi-cert.pem";
+      memset(&callback, 0, sizeof(callback));
+      callback.init_ssl = ssl_enable;
+      port +="s";
+      callback.log_message = log_message;
+      server = start_webserver(port, path, &h_ex, &callback, cert, cert);
+    } else {
+      server = start_webserver(port, path, &h_ex);
+    }
+  }
+  controller->load();
+  controller->start();
+  waitToVerifyProcessor();
+
+  controller->waitUnload(60000);
+  if (url.find("localhost") == std::string::npos) {
+    stop_webserver(server);
+    exit(1);
+  }
+  std::string logs = LogTestController::getInstance().log_output.str();
+
+  assert(logs.find("key:filename value:") != std::string::npos);
+  assert(logs.find("key:invokehttp.request.url value:" + url) != 
std::string::npos);
+  assert(logs.find("key:invokehttp.status.code value:200") != 
std::string::npos);
+
+  LogTestController::getInstance().reset();
+  rmdir("./content_repository");
+  stop_webserver(server);
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/HttpPostIntegrationTest.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/tests/HttpPostIntegrationTest.cpp 
b/extensions/http-curl/tests/HttpPostIntegrationTest.cpp
new file mode 100644
index 0000000..7b5ca97
--- /dev/null
+++ b/extensions/http-curl/tests/HttpPostIntegrationTest.cpp
@@ -0,0 +1,114 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include "HTTPClient.h"
+#include "InvokeHTTP.h"
+#include "processors/ListenHTTP.h"
+#include "processors/LogAttribute.h"
+#include <sstream>
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "CivetServer.h"
+#include "RemoteProcessorGroupPort.h"
+#include "core/ConfigurableComponent.h"
+#include "controllers/SSLContextService.h"
+#include "../tests/TestServer.h"
+#include "HTTPIntegrationBase.h"
+
+class HttpTestHarness : public HTTPIntegrationBase {
+ public:
+  HttpTestHarness() {
+    char format[] = "/tmp/ssth.XXXXXX";
+    dir = testController.createTempDirectory(format);
+  }
+
+  void testSetup() {
+    LogTestController::getInstance().setDebug<minifi::FlowController>();
+    LogTestController::getInstance().setDebug<core::ProcessGroup>();
+    LogTestController::getInstance().setDebug<minifi::SchedulingAgent>();
+    LogTestController::getInstance().setDebug<core::ProcessContext>();
+    LogTestController::getInstance().setDebug<processors::InvokeHTTP>();
+    LogTestController::getInstance().setDebug<utils::HTTPClient>();
+    LogTestController::getInstance().setDebug<processors::ListenHTTP>();
+    
LogTestController::getInstance().setDebug<processors::ListenHTTP::WriteCallback>();
+    
LogTestController::getInstance().setDebug<processors::ListenHTTP::Handler>();
+    LogTestController::getInstance().setDebug<processors::LogAttribute>();
+    LogTestController::getInstance().setDebug<core::Processor>();
+    
LogTestController::getInstance().setDebug<minifi::ThreadedSchedulingAgent>();
+    
LogTestController::getInstance().setDebug<minifi::TimerDrivenSchedulingAgent>();
+    LogTestController::getInstance().setDebug<minifi::core::ProcessSession>();
+    std::fstream file;
+    ss << dir << "/" << "tstFile.ext";
+    file.open(ss.str(), std::ios::out);
+    file << "tempFile";
+    file.close();
+    configuration->set("nifi.flow.engine.threads", "8");
+    configuration->set("nifi.c2.enable", "false");
+  }
+
+  void cleanup() {
+    unlink(ss.str().c_str());
+  }
+
+  void runAssertions() {
+    assert(LogTestController::getInstance().contains("curl performed") == 
true);
+    assert(LogTestController::getInstance().contains("Size:1024 Offset:0") == 
true);
+    assert(LogTestController::getInstance().contains("Size:0 Offset:0") == 
false);
+  }
+
+ protected:
+  char *dir;
+  std::stringstream ss;
+  TestController testController;
+};
+
+int main(int argc, char **argv) {
+  std::string key_dir, test_file_location, url;
+  if (argc > 1) {
+    test_file_location = argv[1];
+    key_dir = argv[2];
+  }
+
+  HttpTestHarness harness;
+
+  harness.setKeyDir(key_dir);
+
+  harness.run(test_file_location);
+
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/SiteToSiteRestTest.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/tests/SiteToSiteRestTest.cpp 
b/extensions/http-curl/tests/SiteToSiteRestTest.cpp
new file mode 100644
index 0000000..2cf0955
--- /dev/null
+++ b/extensions/http-curl/tests/SiteToSiteRestTest.cpp
@@ -0,0 +1,145 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CURLOPT_SSL_VERIFYPEER_DISABLE 1
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include "HTTPClient.h"
+#include "InvokeHTTP.h"
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "CivetServer.h"
+#include "RemoteProcessorGroupPort.h"
+#include "core/ConfigurableComponent.h"
+#include "controllers/SSLContextService.h"
+#include "../tests/TestServer.h"
+#include "HTTPIntegrationBase.h"
+
+class Responder : public CivetHandler {
+ public:
+  explicit Responder(bool isSecure)
+      : isSecure(isSecure) {
+  }
+  bool handleGet(CivetServer *server, struct mg_connection *conn) {
+    std::string site2site_rest_resp = "{"
+        "\"revision\": {"
+        "\"clientId\": \"483d53eb-53ec-4e93-b4d4-1fc3d23dae6f\""
+        "},"
+        "\"controller\": {"
+        "\"id\": \"fe4a3a42-53b6-4af1-a80d-6fdfe60de97f\","
+        "\"name\": \"NiFi Flow\","
+        "\"remoteSiteListeningPort\": 10001,"
+        "\"siteToSiteSecure\": ";
+    site2site_rest_resp += (isSecure ? "true" : "false");
+    site2site_rest_resp += "}}";
+    mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+              "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+              site2site_rest_resp.length());
+    mg_printf(conn, "%s", site2site_rest_resp.c_str());
+    return true;
+  }
+
+ protected:
+  bool isSecure;
+};
+
+class SiteToSiteTestHarness : public HTTPIntegrationBase {
+ public:
+  explicit SiteToSiteTestHarness(bool isSecure)
+      : isSecure(isSecure) {
+    char format[] = "/tmp/ssth.XXXXXX";
+    dir = testController.createTempDirectory(format);
+  }
+
+  void testSetup() {
+    
LogTestController::getInstance().setDebug<minifi::RemoteProcessorGroupPort>();
+    LogTestController::getInstance().setDebug<utils::HTTPClient>();
+    
LogTestController::getInstance().setTrace<minifi::controllers::SSLContextService>();
+    LogTestController::getInstance().setInfo<minifi::FlowController>();
+    LogTestController::getInstance().setDebug<core::ConfigurableComponent>();
+
+    std::fstream file;
+    ss << dir << "/" << "tstFile.ext";
+    file.open(ss.str(), std::ios::out);
+    file << "tempFile";
+    file.close();
+  }
+
+  void cleanup() {
+    unlink(ss.str().c_str());
+  }
+
+  void runAssertions() {
+    if (isSecure) {
+      assert(LogTestController::getInstance().contains("process group remote 
site2site port 10001, is secure 1") == true);
+    } else {
+      assert(LogTestController::getInstance().contains("process group remote 
site2site port 10001, is secure 0") == true);
+    }
+  }
+
+ protected:
+  bool isSecure;
+  char *dir;
+  std::stringstream ss;
+  TestController testController;
+};
+
+int main(int argc, char **argv) {
+  std::string key_dir, test_file_location, url;
+  if (argc > 1) {
+    test_file_location = argv[1];
+    key_dir = argv[2];
+    url = argv[3];
+  }
+
+  bool isSecure = false;
+  if (url.find("https") != std::string::npos) {
+    isSecure = true;
+  }
+
+  SiteToSiteTestHarness harness(isSecure);
+
+  Responder responder(isSecure);
+
+  harness.setKeyDir(key_dir);
+
+  harness.setUrl(url, &responder);
+
+  harness.run(test_file_location);
+
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/TestServer.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/tests/TestServer.h 
b/extensions/http-curl/tests/TestServer.h
new file mode 100644
index 0000000..06f996c
--- /dev/null
+++ b/extensions/http-curl/tests/TestServer.h
@@ -0,0 +1,117 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_TEST_TESTSERVER_H_
+#define LIBMINIFI_TEST_TESTSERVER_H_
+#include <regex.h>
+#include <string>
+#include <iostream>
+#include "civetweb.h"
+#include "CivetServer.h"
+
+
+/* Server context handle */
+static std::string resp_str;
+
+void init_webserver() {
+  mg_init_library(0);
+}
+
+
+CivetServer * start_webserver(std::string &port, std::string &rooturi, 
CivetHandler *handler, struct mg_callbacks *callbacks, std::string &cert, 
std::string &ca_cert) {
+  const char *options[] = { "listening_ports", port.c_str(), "error_log_file",
+      "error.log", "ssl_certificate", ca_cert.c_str(), "ssl_protocol_version", 
"0", "ssl_cipher_list",
+      "ALL", "ssl_verify_peer", "no", 0 };
+
+  std::vector<std::string> cpp_options;
+  for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
+    cpp_options.push_back(options[i]);
+  }
+  CivetServer *server = new CivetServer(cpp_options);
+
+  server->addHandler(rooturi, handler);
+
+  return server;
+
+}
+
+CivetServer * start_webserver(std::string &port, std::string &rooturi, 
CivetHandler *handler) {
+  const char *options[] = { "document_root", ".", "listening_ports", 
port.c_str(), 0 };
+
+  std::vector<std::string> cpp_options;
+  for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
+    cpp_options.push_back(options[i]);
+  }
+  CivetServer *server = new CivetServer(cpp_options);
+
+  server->addHandler(rooturi, handler);
+
+  return server;
+
+}
+
+bool parse_http_components(const std::string &url, std::string &port, 
std::string &scheme, std::string &path) {
+  regex_t regex;
+
+  const char *regexstr = "^(http|https)://(localhost:)([0-9]+)?(/.*)$";
+
+  int ret = regcomp(&regex, regexstr, REG_EXTENDED);
+  if (ret) {
+    return false;
+  }
+
+  size_t potentialGroups = regex.re_nsub + 1;
+  regmatch_t groups[potentialGroups];
+  if (regexec(&regex, url.c_str(), potentialGroups, groups, 0) == 0) {
+    for (int i = 0; i < potentialGroups; i++) {
+      if (groups[i].rm_so == -1)
+        break;
+
+      std::string str(url.data() + groups[i].rm_so, groups[i].rm_eo - 
groups[i].rm_so);
+      switch (i) {
+        case 1:
+          scheme = str;
+          break;
+        case 3:
+          port = str;
+          break;
+        case 4:
+          path = str;
+          break;
+        default:
+          break;
+      }
+    }
+  }
+  if (path.empty() || scheme.empty() || port.empty())
+    return false;
+
+  regfree(&regex);
+
+  return true;
+
+}
+
+static void stop_webserver(CivetServer *server) {
+  if (server != nullptr)
+    delete server;
+
+  /* Un-initialize the library */
+  mg_exit_library();
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/ThreadPoolAdjust.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/tests/ThreadPoolAdjust.cpp 
b/extensions/http-curl/tests/ThreadPoolAdjust.cpp
new file mode 100644
index 0000000..13524d6
--- /dev/null
+++ b/extensions/http-curl/tests/ThreadPoolAdjust.cpp
@@ -0,0 +1,115 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include "HTTPClient.h"
+#include "InvokeHTTP.h"
+#include "processors/ListenHTTP.h"
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "CivetServer.h"
+#include "RemoteProcessorGroupPort.h"
+#include "core/ConfigurableComponent.h"
+#include "controllers/SSLContextService.h"
+#include "TestServer.h"
+#include "HTTPIntegrationBase.h"
+#include "processors/InvokeHTTP.h"
+#include "processors/ListenHTTP.h"
+#include "processors/LogAttribute.h"
+
+class HttpTestHarness : public IntegrationBase {
+ public:
+  HttpTestHarness() {
+    char format[] = "/tmp/ssth.XXXXXX";
+    dir = testController.createTempDirectory(format);
+  }
+
+  void testSetup() {
+    LogTestController::getInstance().setDebug<minifi::FlowController>();
+    LogTestController::getInstance().setDebug<core::ProcessGroup>();
+    LogTestController::getInstance().setDebug<minifi::SchedulingAgent>();
+    LogTestController::getInstance().setDebug<core::ProcessContext>();
+    LogTestController::getInstance().setDebug<processors::InvokeHTTP>();
+    LogTestController::getInstance().setDebug<utils::HTTPClient>();
+    LogTestController::getInstance().setDebug<processors::ListenHTTP>();
+    
LogTestController::getInstance().setDebug<processors::ListenHTTP::WriteCallback>();
+    
LogTestController::getInstance().setDebug<processors::ListenHTTP::Handler>();
+    LogTestController::getInstance().setDebug<processors::LogAttribute>();
+    LogTestController::getInstance().setDebug<core::Processor>();
+    
LogTestController::getInstance().setDebug<minifi::ThreadedSchedulingAgent>();
+    
LogTestController::getInstance().setDebug<minifi::TimerDrivenSchedulingAgent>();
+    LogTestController::getInstance().setDebug<minifi::core::ProcessSession>();
+    std::fstream file;
+    ss << dir << "/" << "tstFile.ext";
+    file.open(ss.str(), std::ios::out);
+    file << "tempFile";
+    file.close();
+    configuration->set("nifi.flow.engine.threads", "1");
+  }
+
+  void cleanup() {
+    unlink(ss.str().c_str());
+  }
+
+  void runAssertions() {
+    assert(LogTestController::getInstance().contains("curl performed") == 
true);
+    assert(LogTestController::getInstance().contains("Size:1024 Offset:0") == 
true);
+    assert(LogTestController::getInstance().contains("Size:0 Offset:0") == 
false);
+  }
+
+ protected:
+  char *dir;
+  std::stringstream ss;
+  TestController testController;
+};
+
+int main(int argc, char **argv) {
+  std::string key_dir, test_file_location, url;
+  if (argc > 1) {
+    test_file_location = argv[1];
+    key_dir = argv[2];
+  }
+
+  HttpTestHarness harness;
+
+  harness.setKeyDir(key_dir);
+
+  harness.run(test_file_location);
+
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp 
b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
new file mode 100644
index 0000000..81d2714
--- /dev/null
+++ b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
@@ -0,0 +1,315 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <uuid/uuid.h>
+#include <fstream>
+#include <map>
+#include <memory>
+#include <utility>
+#include <string>
+#include <set>
+#include "FlowController.h"
+#include "io/BaseStream.h"
+#include "TestBase.h"
+#include "processors/GetFile.h"
+#include "core/Core.h"
+#include "HTTPClient.h"
+#include "InvokeHTTP.h"
+#include "processors/ListenHTTP.h"
+#include "core/FlowFile.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessorNode.h"
+#include "processors/InvokeHTTP.h"
+#include "processors/ListenHTTP.h"
+#include "processors/LogAttribute.h"
+
+TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") {
+  TestController testController;
+  std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
+  
LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::InvokeHTTP>();
+
+  std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+
+  std::shared_ptr<core::Processor> getfileprocessor = 
std::make_shared<org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
+
+  std::shared_ptr<core::Processor> logAttribute = 
std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+
+  char format[] = "/tmp/gt.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+
+  std::shared_ptr<core::Processor> listenhttp = 
std::make_shared<org::apache::nifi::minifi::processors::ListenHTTP>("listenhttp");
+
+  std::shared_ptr<core::Processor> invokehttp = 
std::make_shared<org::apache::nifi::minifi::processors::InvokeHTTP>("invokehttp");
+  uuid_t processoruuid;
+  REQUIRE(true == listenhttp->getUUID(processoruuid));
+
+  uuid_t invokehttp_uuid;
+  REQUIRE(true == invokehttp->getUUID(invokehttp_uuid));
+
+  std::shared_ptr<minifi::Connection> gcConnection = 
std::make_shared<minifi::Connection>(repo, content_repo, 
"getfileCreate2Connection");
+  gcConnection->setRelationship(core::Relationship("success", "description"));
+
+  std::shared_ptr<minifi::Connection> laConnection = 
std::make_shared<minifi::Connection>(repo, content_repo, "logattribute");
+  laConnection->setRelationship(core::Relationship("success", "description"));
+
+  std::shared_ptr<minifi::Connection> connection = 
std::make_shared<minifi::Connection>(repo, content_repo, 
"getfileCreate2Connection");
+  connection->setRelationship(core::Relationship("success", "description"));
+
+  std::shared_ptr<minifi::Connection> connection2 = 
std::make_shared<minifi::Connection>(repo, content_repo, "listenhttp");
+
+  connection2->setRelationship(core::Relationship("No Retry", "description"));
+
+  // link the connections so that we can test results at the end for this
+  connection->setSource(listenhttp);
+
+  connection2->setSourceUUID(invokehttp_uuid);
+  connection->setSourceUUID(processoruuid);
+  connection->setDestinationUUID(invokehttp_uuid);
+
+  listenhttp->addConnection(connection);
+  invokehttp->addConnection(connection);
+  invokehttp->addConnection(connection2);
+
+  std::shared_ptr<core::ProcessorNode> node = 
std::make_shared<core::ProcessorNode>(listenhttp);
+  std::shared_ptr<core::ProcessorNode> node2 = 
std::make_shared<core::ProcessorNode>(invokehttp);
+  std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_services_provider = nullptr;
+  std::shared_ptr<core::ProcessContext> context = 
std::make_shared<core::ProcessContext>(node, controller_services_provider, 
repo, repo, content_repo);
+  std::shared_ptr<core::ProcessContext> context2 = 
std::make_shared<core::ProcessContext>(node2, controller_services_provider, 
repo, repo, content_repo);
+  
context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, 
"8686");
+  
context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath,
 "/testytesttest");
+
+  
context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::Method,
 "POST");
+  
context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::URL, 
"http://localhost:8686/testytesttest";);
+  auto session = std::make_shared<core::ProcessSession>(context);
+  auto session2 = std::make_shared<core::ProcessSession>(context2);
+
+  REQUIRE(listenhttp->getName() == "listenhttp");
+
+  std::shared_ptr<core::ProcessSessionFactory> factory = 
std::make_shared<core::ProcessSessionFactory>(context);
+
+  std::shared_ptr<core::FlowFile> record;
+  listenhttp->setScheduledState(core::ScheduledState::RUNNING);
+  listenhttp->onSchedule(context, factory);
+  listenhttp->onTrigger(context, session);
+
+  invokehttp->incrementActiveTasks();
+  invokehttp->setScheduledState(core::ScheduledState::RUNNING);
+  std::shared_ptr<core::ProcessSessionFactory> factory2 = 
std::make_shared<core::ProcessSessionFactory>(context2);
+  invokehttp->onSchedule(context2, factory2);
+  invokehttp->onTrigger(context2, session2);
+
+  provenance::ProvenanceReporter *reporter = session->getProvenanceReporter();
+  std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents();
+  record = session->get();
+  REQUIRE(record == nullptr);
+  REQUIRE(records.size() == 0);
+
+  listenhttp->incrementActiveTasks();
+  listenhttp->setScheduledState(core::ScheduledState::RUNNING);
+  listenhttp->onTrigger(context, session);
+
+  reporter = session->getProvenanceReporter();
+
+  records = reporter->getEvents();
+  session->commit();
+
+  invokehttp->incrementActiveTasks();
+  invokehttp->setScheduledState(core::ScheduledState::RUNNING);
+  invokehttp->onTrigger(context2, session2);
+
+  session2->commit();
+  records = reporter->getEvents();
+
+  for (provenance::ProvenanceEventRecord *provEventRecord : records) {
+    REQUIRE(provEventRecord->getComponentType() == listenhttp->getName());
+  }
+  std::shared_ptr<core::FlowFile> ffr = session2->get();
+  REQUIRE(true == LogTestController::getInstance().contains("exiting because 
method is POST"));
+  LogTestController::getInstance().reset();
+}
+
+class CallBack : public minifi::OutputStreamCallback {
+ public:
+  CallBack() {
+  }
+  virtual ~CallBack() {
+  }
+  virtual int64_t process(std::shared_ptr<minifi::io::BaseStream> stream) {
+    // leaving the typo for posterity sake
+    std::string st = "we're gnna write some test stuff";
+    return 
stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(st.c_str())), 
st.length());
+  }
+};
+
+TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") {
+  TestController testController;
+  
LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::InvokeHTTP>();
+
+  std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+
+  std::shared_ptr<core::Processor> getfileprocessor = 
std::make_shared<org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
+
+  std::shared_ptr<core::Processor> logAttribute = 
std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+
+  char format[] = "/tmp/gt.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+
+  std::shared_ptr<core::Processor> listenhttp = 
std::make_shared<org::apache::nifi::minifi::processors::ListenHTTP>("listenhttp");
+
+  std::shared_ptr<core::Processor> invokehttp = 
std::make_shared<org::apache::nifi::minifi::processors::InvokeHTTP>("invokehttp");
+  uuid_t processoruuid;
+  REQUIRE(true == listenhttp->getUUID(processoruuid));
+
+  uuid_t invokehttp_uuid;
+  REQUIRE(true == invokehttp->getUUID(invokehttp_uuid));
+
+  std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
+
+  std::shared_ptr<minifi::Connection> gcConnection = 
std::make_shared<minifi::Connection>(repo, content_repo, 
"getfileCreate2Connection");
+  gcConnection->setRelationship(core::Relationship("success", "description"));
+
+  std::shared_ptr<minifi::Connection> laConnection = 
std::make_shared<minifi::Connection>(repo, content_repo, "logattribute");
+  laConnection->setRelationship(core::Relationship("success", "description"));
+
+  std::shared_ptr<minifi::Connection> connection = 
std::make_shared<minifi::Connection>(repo, content_repo, 
"getfileCreate2Connection");
+  connection->setRelationship(core::Relationship("success", "description"));
+
+  std::shared_ptr<minifi::Connection> connection2 = 
std::make_shared<minifi::Connection>(repo, content_repo, "listenhttp");
+
+  connection2->setRelationship(core::Relationship("No Retry", "description"));
+
+  // link the connections so that we can test results at the end for this
+  connection->setSource(listenhttp);
+
+  connection->setSourceUUID(invokehttp_uuid);
+  connection->setDestinationUUID(processoruuid);
+
+  connection2->setSourceUUID(processoruuid);
+  connection2->setSourceUUID(processoruuid);
+
+  listenhttp->addConnection(connection);
+  invokehttp->addConnection(connection);
+  invokehttp->addConnection(connection2);
+
+
+  std::shared_ptr<core::ProcessorNode> node = 
std::make_shared<core::ProcessorNode>(listenhttp);
+  std::shared_ptr<core::ProcessorNode> node2 = 
std::make_shared<core::ProcessorNode>(invokehttp);
+  std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_services_provider = nullptr;
+  std::shared_ptr<core::ProcessContext> context = 
std::make_shared<core::ProcessContext>(node, controller_services_provider, 
repo, repo, content_repo);
+  std::shared_ptr<core::ProcessContext> context2 = 
std::make_shared<core::ProcessContext>(node2, controller_services_provider, 
repo, repo, content_repo);
+  
context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, 
"8680");
+  
context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath,
 "/testytesttest");
+
+  
context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::Method,
 "POST");
+  
context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::URL, 
"http://localhost:8680/testytesttest";);
+  auto session = std::make_shared<core::ProcessSession>(context);
+  auto session2 = std::make_shared<core::ProcessSession>(context2);
+
+  REQUIRE(listenhttp->getName() == "listenhttp");
+
+  std::shared_ptr<core::ProcessSessionFactory> factory = 
std::make_shared<core::ProcessSessionFactory>(context);
+
+  std::shared_ptr<core::FlowFile> record;
+
+  CallBack callback;
+
+  std::map<std::string, std::string> attributes;
+  attributes["testy"] = "test";
+  std::shared_ptr<minifi::FlowFileRecord> flow = 
std::make_shared<minifi::FlowFileRecord>(repo, content_repo, attributes);
+  session2->write(flow, &callback);
+
+  invokehttp->incrementActiveTasks();
+  invokehttp->setScheduledState(core::ScheduledState::RUNNING);
+  std::shared_ptr<core::ProcessSessionFactory> factory2 = 
std::make_shared<core::ProcessSessionFactory>(context2);
+  invokehttp->onSchedule(context2, factory2);
+  invokehttp->onTrigger(context2, session2);
+
+  listenhttp->incrementActiveTasks();
+  listenhttp->setScheduledState(core::ScheduledState::RUNNING);
+  listenhttp->onSchedule(context, factory);
+  listenhttp->onTrigger(context, session);
+
+  provenance::ProvenanceReporter *reporter = session->getProvenanceReporter();
+  std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents();
+  record = session->get();
+  REQUIRE(record == nullptr);
+  REQUIRE(records.size() == 0);
+
+  listenhttp->incrementActiveTasks();
+  listenhttp->setScheduledState(core::ScheduledState::RUNNING);
+  listenhttp->onTrigger(context, session);
+
+  reporter = session->getProvenanceReporter();
+
+  records = reporter->getEvents();
+  session->commit();
+
+  invokehttp->incrementActiveTasks();
+  invokehttp->setScheduledState(core::ScheduledState::RUNNING);
+  invokehttp->onTrigger(context2, session2);
+
+  session2->commit();
+  records = reporter->getEvents();
+
+  for (provenance::ProvenanceEventRecord *provEventRecord : records) {
+    REQUIRE(provEventRecord->getComponentType() == listenhttp->getName());
+  }
+  std::shared_ptr<core::FlowFile> ffr = session2->get();
+  REQUIRE(true == LogTestController::getInstance().contains("exiting because 
method is POST"));
+  LogTestController::getInstance().reset();
+}
+
+TEST_CASE("HTTPTestsPostNoResourceClaim", "[httptest1]") {
+  TestController testController;
+  
LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::InvokeHTTP>();
+  
LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::ListenHTTP>();
+  LogTestController::getInstance().setInfo<core::Processor>();
+
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+  std::shared_ptr<core::Processor> processor = 
plan->addProcessor("ListenHTTP", "listenhttp", core::Relationship("No Retry", 
"description"), false);
+  std::shared_ptr<core::Processor> invokehttp = 
plan->addProcessor("InvokeHTTP", "invokehttp", core::Relationship("success", 
"description"), true);
+
+  REQUIRE(true == plan->setProperty(processor, 
org::apache::nifi::minifi::processors::ListenHTTP::Port.getName(), "8685"));
+  REQUIRE(true == plan->setProperty(processor, 
org::apache::nifi::minifi::processors::ListenHTTP::BasePath.getName(), 
"/testytesttest"));
+
+  REQUIRE(true == plan->setProperty(invokehttp, 
org::apache::nifi::minifi::processors::InvokeHTTP::Method.getName(), "POST"));
+  REQUIRE(true == plan->setProperty(invokehttp, 
org::apache::nifi::minifi::processors::InvokeHTTP::URL.getName(), 
"http://localhost:8685/testytesttest";));
+  plan->reset();
+  testController.runSession(plan, true);
+
+  std::set<provenance::ProvenanceEventRecord*> records = 
plan->getProvenanceRecords();
+  std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
+  REQUIRE(record == nullptr);
+  REQUIRE(records.size() == 0);
+
+  plan->reset();
+  testController.runSession(plan, true);
+
+  records = plan->getProvenanceRecords();
+  record = plan->getCurrentFlowFile();
+
+  for (provenance::ProvenanceEventRecord *provEventRecord : records) {
+    REQUIRE(provEventRecord->getComponentType() == processor->getName());
+  }
+  std::shared_ptr<core::FlowFile> ffr = plan->getCurrentFlowFile();
+  REQUIRE(true == LogTestController::getInstance().contains("exiting because 
method is POST"));
+  LogTestController::getInstance().reset();
+}

Reply via email to