Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/158#discussion_r147542584
  
    --- Diff: libminifi/test/curl-tests/sitetositehttp/HTTPHandlers.h ---
    @@ -0,0 +1,322 @@
    +/**
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#include "civetweb.h"
    +#include "CivetServer.h"
    +#include "concurrentqueue.h"
    +
    +
    +
    +#include "CivetStream.h"
    +#include "io/CRCStream.h"
    +#ifndef LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_
    +#define LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_
    +static std::atomic<int> transaction_id;
    +static std::atomic<int> transaction_id_output;
    +
    +class FlowObj {
    + public:
    +  FlowObj()
    +      : total_size(0) {
    +
    +  }
    +  explicit FlowObj(const FlowObj &&other)
    +      : attributes(std::move(other.attributes)),
    +        total_size(std::move(other.total_size)),
    +        data(std::move(other.data)) {
    +
    +  }
    +  uint64_t total_size;
    +  std::map<std::string, std::string> attributes;
    +  std::vector<uint8_t> data;
    +
    +};
    +
    +class SiteToSiteLocationResponder : public CivetHandler {
    + public:
    +  explicit SiteToSiteLocationResponder(bool isSecure)
    +      : isSecure(isSecure) {
    +  }
    +  bool handleGet(CivetServer *server, struct mg_connection *conn) {
    +    std::string site2site_rest_resp = "{"
    +        "\"revision\": {"
    +        "\"clientId\": \"483d53eb-53ec-4e93-b4d4-1fc3d23dae6f\""
    +        "},"
    +        "\"controller\": {"
    +        "\"id\": \"fe4a3a42-53b6-4af1-a80d-6fdfe60de97f\","
    +        "\"name\": \"NiFi Flow\","
    +        "\"siteToSiteSecure\": ";
    +    site2site_rest_resp += (isSecure ? "true" : "false");
    +    site2site_rest_resp += "}}";
    +    mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
    +              "text/plain\r\nContent-Length: %lu\r\nConnection: 
close\r\n\r\n",
    +              site2site_rest_resp.length());
    +    mg_printf(conn, "%s", site2site_rest_resp.c_str());
    +    return true;
    +  }
    +
    + protected:
    +  bool isSecure;
    +};
    +
    +class PeerResponder : public CivetHandler {
    + public:
    +
    +  explicit PeerResponder(const std::string base_url)
    +      : base_url(base_url) {
    +  }
    +
    +  bool handleGet(CivetServer *server, struct mg_connection *conn) {
    +    std::string site2site_rest_resp = "{\"peers\" : [{ \"hostname\": 
\"localhost\", \"port\": 8082,  \"secure\": false, \"flowFileCount\" : 0 }] }";
    +    std::stringstream headers;
    +    headers << "HTTP/1.1 200 OK\r\nContent-Type: 
application/json\r\nContent-Length: " << site2site_rest_resp.length() << 
"\r\nConnection: close\r\n\r\n";
    +    mg_printf(conn, "%s", headers.str().c_str());
    +    mg_printf(conn, "%s", site2site_rest_resp.c_str());
    +    return true;
    +  }
    +
    + protected:
    +  std::string base_url;
    +};
    +
    +class TransactionResponder : public CivetHandler {
    + public:
    +
    +  explicit TransactionResponder(const std::string base_url, std::string 
port_id, bool input_port, bool wrong_uri, bool empty_transaction_uri)
    +      : base_url(base_url),
    +        wrong_uri(wrong_uri),
    +        empty_transaction_uri(empty_transaction_uri),
    +        input_port(input_port),
    +        port_id(port_id),
    +        flow_files_feed_(nullptr) {
    +
    +    if (input_port) {
    +      transaction_id_str = "fe4a3a42-53b6-4af1-a80d-6fdfe60de96";
    +      transaction_id_str += std::to_string(transaction_id.load());
    +      transaction_id++;
    +    } else {
    +      transaction_id_str = "fe4a3a42-53b6-4af1-a80d-6fdfe60de95";
    +      transaction_id_str += std::to_string(transaction_id_output.load());
    +      transaction_id_output++;
    +    }
    +  }
    +
    +  bool handlePost(CivetServer *server, struct mg_connection *conn) {
    +    std::string site2site_rest_resp = "";
    +    std::stringstream headers;
    +    headers << "HTTP/1.1 201 OK\r\nContent-Type: 
application/json\r\nContent-Length: " << site2site_rest_resp.length() << 
"\r\nx-location-uri-intent: ";
    +    if (wrong_uri)
    +      headers << "ohstuff\r\n";
    +    else
    +      headers << "transaction-url\r\n";
    +
    +    std::string port_type;
    +
    +    if (input_port)
    +      port_type = "input-ports";
    +    else
    +      port_type = "output-ports";
    +    if (!empty_transaction_uri)
    +      headers << "Location: " << base_url << "/site-to-site/" << port_type 
<< "/" << port_id << "/transactions/" << transaction_id_str << "\r\n";
    +    headers << "Connection: close\r\n\r\n";
    +    mg_printf(conn, "%s", headers.str().c_str());
    +    mg_printf(conn, "%s", site2site_rest_resp.c_str());
    +    return true;
    +  }
    +
    +  void setFeed(moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> 
*feed) {
    +    flow_files_feed_ = feed;
    +  }
    +
    +  std::string getTransactionId() {
    +    return transaction_id_str;
    +  }
    + protected:
    +  moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_;
    +  std::string transaction_id_str;
    +  std::string base_url;bool wrong_uri;bool empty_transaction_uri;bool 
input_port;
    +  std::string port_id;
    +};
    +
    +class FlowFileResponder : public CivetHandler {
    +
    +  moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> flow_files_;
    +  moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_;
    +
    + public:
    +
    +  explicit FlowFileResponder(bool input_port, bool wrong_uri, bool 
invalid_checksum)
    +      : wrong_uri(wrong_uri),
    +        input_port(input_port),
    +        flow_files_feed_(nullptr),
    +        invalid_checksum(invalid_checksum) {
    +  }
    +
    +  moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *getFlows() {
    +    return &flow_files_;
    +  }
    +
    +  void setFeed(moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> 
*feed) {
    +    flow_files_feed_ = feed;
    +  }
    +
    +  bool handlePost(CivetServer *server, struct mg_connection *conn) {
    +    std::string site2site_rest_resp = "";
    +    std::stringstream headers;
    +
    +    if (!wrong_uri) {
    +      minifi::io::CivetStream civet_stream(conn);
    +      minifi::io::CRCStream<minifi::io::CivetStream> stream(&civet_stream);
    +      uint32_t num_attributes;
    +      uint64_t total_size = 0;
    +      total_size += stream.read(num_attributes);
    +
    +      auto flow = std::make_shared<FlowObj>();
    +
    +      for (int i = 0; i < num_attributes; i++) {
    +        std::string name, value;
    +        total_size += stream.readUTF(name, true);
    +        total_size += stream.readUTF(value, true);
    +        flow->attributes[name] = value;
    +      }
    +      uint64_t length;
    +      total_size += stream.read(length);
    +
    +      total_size += length;
    +      flow->data.resize(length);
    +      flow->total_size = total_size;
    +
    +      assert(stream.readData(flow->data.data(), length) == length);
    +
    +      assert(flow->attributes["path"] == ".");
    +      assert(!flow->attributes["uuid"].empty());
    +      assert(!flow->attributes["filename"].empty());
    +
    +      if (!invalid_checksum) {
    +        site2site_rest_resp = std::to_string(stream.getCRC());
    +        flow_files_.enqueue(flow);
    +      } else {
    +        site2site_rest_resp = "Imawrongchecksumshortandstout";
    +      }
    +
    +      headers << "HTTP/1.1 202 OK\r\nContent-Type: 
application/json\r\nContent-Length: " << site2site_rest_resp.length() << 
"\r\nConnection: close\r\n\r\n";
    +    } else {
    +      headers << "HTTP/1.1 404\r\nConnection: close\r\n\r\n";
    +    }
    +
    +    mg_printf(conn, "%s", headers.str().c_str());
    +    mg_printf(conn, "%s", site2site_rest_resp.c_str());
    +    return true;
    +  }
    +
    +  bool handleGet(CivetServer *server, struct mg_connection *conn) {
    +
    +    if (flow_files_feed_->size_approx() > 0) {
    +      std::shared_ptr<FlowObj> flow;
    +      uint8_t buf[1];
    +      std::vector<std::shared_ptr<FlowObj>> flows;
    +      uint64_t total = 0;
    +
    +      while (flow_files_feed_->try_dequeue(flow)) {
    +        flows.push_back(flow);
    +        total += flow->total_size;
    +      }
    +      mg_printf(conn, "HTTP/1.1 200 OK\r\n"
    +                "Content-Length: %llu\r\n"
    +                "Content-Type: application/octet-stream\r\n"
    +                "Connection: close\r\n\r\n",
    +                total);
    +      minifi::io::BaseStream serializer;
    +      minifi::io::CRCStream<minifi::io::BaseStream> stream(&serializer);
    +      for (auto flow : flows) {
    +        uint32_t num_attributes = flow->attributes.size();
    +        stream.write(num_attributes);
    +        for (auto entry : flow->attributes) {
    +          stream.writeUTF(entry.first);
    +          stream.writeUTF(entry.second);
    +        }
    +        uint64_t length = flow->data.size();
    +        stream.write(length);
    +        stream.writeData(flow->data.data(), length);
    +      }
    +      auto ret = mg_write(conn, serializer.getBuffer(), total);
    +    } else {
    +      std::cout << "Nothing to transfer feed" << std::endl;
    +      mg_printf(conn, "HTTP/1.1 200 OK\r\nConnection: "
    +                "close\r\nContent-Length: 0\r\n");
    +      mg_printf(conn, "Content-Type: text/plain\r\n\r\n");
    +
    +    }
    +    return true;
    +  }
    +
    +  void setFlowUrl(std::string flowUrl) {
    +    base_url = flowUrl;
    +  }
    +
    + protected:
    +// base url
    +  std::string base_url;
    +// set the wrong url
    +  bool wrong_uri;
    +// we are running an input port
    +  bool input_port;
    +// invalid checksum is returned.
    +  bool invalid_checksum;
    +};
    +
    +class DeleteTransactionResponder : public CivetHandler {
    + public:
    +
    +  explicit DeleteTransactionResponder(const std::string base_url, 
std::string response_code, int expected_resp_code)
    +      : base_url(base_url),
    +        response_code(response_code),
    +        flow_files_feed_(nullptr) {
    +    expected_resp_code_str = std::to_string(expected_resp_code);
    +  }
    +
    +  explicit DeleteTransactionResponder(const std::string base_url, 
std::string response_code, 
moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed)
    +      : base_url(base_url),
    +        response_code(response_code),
    +        flow_files_feed_(feed) {
    +  }
    +
    +  bool handleDelete(CivetServer *server, struct mg_connection *conn) {
    +
    +    std::string site2site_rest_resp = "";
    --- End diff --
    
    Should do some additional verification here if possible. 


---

Reply via email to