fgerlits commented on code in PR #1966:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1966#discussion_r2210955235


##########
libminifi/src/sitetosite/HttpSiteToSiteClient.cpp:
##########
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "sitetosite/HttpSiteToSiteClient.h"
+
+#include <chrono>
+#include <map>
+#include <string>
+#include <memory>
+#include <thread>
+#include <iostream>
+#include <vector>
+#include <optional>
+
+#include "io/CRCStream.h"
+#include "sitetosite/Peer.h"
+#include "io/validation.h"
+#include "core/Resource.h"
+#include "utils/StringUtils.h"
+#include "Exception.h"
+
+#include "rapidjson/document.h"
+#include "rapidjson/error/en.h"
+
+#undef DELETE  // macro on windows
+
+namespace org::apache::nifi::minifi::sitetosite {
+
+namespace {
+std::optional<utils::Identifier> parseTransactionId(const std::string &uri) {
+  return 
utils::Identifier::parse(utils::string::partAfterLastOccurrenceOf(uri, '/'));
+}
+
+std::optional<std::vector<PeerStatus>> parsePeerStatuses(const 
std::shared_ptr<core::logging::Logger> &logger, const std::string &entity, 
const utils::Identifier &id) {
+  try {
+    rapidjson::Document root;
+    rapidjson::ParseResult ok = root.Parse(entity.c_str());
+    if (!ok) {
+      std::stringstream ss;
+      ss << "Failed to parse archive lens stack from JSON string with reason: "
+          << rapidjson::GetParseError_En(ok.Code())
+          << " at offset " << ok.Offset();
+
+      throw Exception(ExceptionType::GENERAL_EXCEPTION, ss.str());
+    }
+
+    std::vector<PeerStatus> peer_statuses;
+    if (!root.HasMember("peers") || !root["peers"].IsArray() || 
root["peers"].Size() <= 0) {
+      logger->log_debug("Peers is either not a member or is empty. String to 
analyze: {}", entity);
+      return peer_statuses;
+    }
+
+    for (const auto &peer : root["peers"].GetArray()) {
+      std::string hostname;
+      int port = 0;
+      int flow_file_count = 0;
+
+      if (peer.HasMember("hostname") && peer["hostname"].IsString() &&
+          peer.HasMember("port") && peer["port"].IsNumber()) {
+        hostname = peer["hostname"].GetString();
+        port = peer["port"].GetInt();
+      }
+
+      if (peer.HasMember("flowFileCount")) {
+        if (peer["flowFileCount"].IsNumber()) {
+          flow_file_count = gsl::narrow<int>(peer["flowFileCount"].GetInt64());
+        } else {
+          logger->log_debug("Could not parse flowFileCount, so we're going to 
continue without it");
+        }
+      }
+
+      // host name and port are required.
+      if (!IsNullOrEmpty(hostname) && port > 0) {
+        PeerStatus status(id, hostname, port, flow_file_count, true);
+        peer_statuses.push_back(std::move(status));
+      } else {
+        logger->log_debug("hostname empty or port is zero. hostname: {}, port: 
{}", hostname, port);
+      }
+    }
+    return peer_statuses;
+  } catch (const Exception &exception) {
+    logger->log_debug("Caught Exception {}", exception.what());
+    return std::nullopt;
+  }
+}
+}  // namespace
+
+std::shared_ptr<Transaction> 
HttpSiteToSiteClient::createTransaction(TransferDirection direction) {
+  std::string dir_str = direction == TransferDirection::SEND ? "input-ports" : 
"output-ports";
+  std::stringstream uri;
+  uri << getBaseURI() << "data-transfer/" << dir_str << "/" << 
getPortId().to_string() << "/transactions";
+  auto client = createHttpClient(uri.str(), http::HttpRequestMethod::POST);
+  setSiteToSiteHeaders(*client);
+  client->setConnectionTimeout(std::chrono::milliseconds(5000));
+  client->setContentType("application/json");
+  client->setRequestHeader("Accept", "application/json");
+  client->setRequestHeader("Transfer-Encoding", "chunked");
+  client->setPostFields("");
+  client->submit();
+
+  if (auto http_stream = dynamic_cast<http::HttpStream*>(peer_->getStream())) {
+    logger_->log_debug("Closing {}", http_stream->getClientRef()->getURL());
+  }
+
+  if (client->getResponseCode() != 201) {
+    peer_->setStream(nullptr);
+    logger_->log_debug("Could not create transaction, received {}", 
client->getResponseCode());
+    return nullptr;
+  }
+  // parse the headers
+  auto intent_name = client->getHeaderValue("x-location-uri-intent");
+  if (!utils::string::equalsIgnoreCase(intent_name, "transaction-url")) {
+    logger_->log_debug("Could not create transaction, intent is {}", 
intent_name);
+    return nullptr;
+  }
+
+  auto url = client->getHeaderValue("Location");
+  if (IsNullOrEmpty(url)) {
+    logger_->log_debug("Location is empty");
+    return nullptr;
+  }
+
+  org::apache::nifi::minifi::io::CRCStream<SiteToSitePeer> 
crcstream(gsl::make_not_null(peer_.get()));
+  auto transaction = std::make_shared<HttpTransaction>(direction, 
std::move(crcstream));
+  transaction->initialize(this, url);
+  auto transaction_id = parseTransactionId(url);
+  if (!transaction_id) {
+    logger_->log_debug("Transaction ID is empty");
+    return nullptr;
+  }
+  transaction->setTransactionId(transaction_id.value());
+  std::shared_ptr<minifi::http::HTTPClient> transaction_client;
+  if (transaction->getDirection() == TransferDirection::SEND) {
+    transaction_client = openConnectionForSending(transaction);
+  } else {
+    transaction_client = openConnectionForReceive(transaction);
+    transaction->setDataAvailable(true);
+    // 201 tells us that data is available. 200 would mean that nothing is 
available.
+  }
+  gsl_Assert(transaction_client);
+
+  setSiteToSiteHeaders(*transaction_client);
+  peer_->setStream(std::make_unique<http::HttpStream>(transaction_client));
+  logger_->log_debug("Created transaction id -{}-", 
transaction->getUUID().to_string());
+  known_transactions_[transaction->getUUID()] = transaction;
+  return transaction;
+}
+
+std::optional<SiteToSiteResponse> 
HttpSiteToSiteClient::readResponseForReceiveTransfer(const 
std::shared_ptr<Transaction>& transaction) {
+  SiteToSiteResponse response;
+  if (current_code_ == ResponseCode::FINISH_TRANSACTION) {
+    return response;
+  }
+
+  if (transaction->getState() == TransactionState::TRANSACTION_STARTED || 
transaction->getState() == TransactionState::DATA_EXCHANGED) {
+    if (current_code_ == ResponseCode::CONFIRM_TRANSACTION && 
transaction->getState() == TransactionState::DATA_EXCHANGED) {
+      auto stream = dynamic_cast<http::HttpStream*>(peer_->getStream());
+      if (!stream->isFinished()) {
+        logger_->log_debug("confirm read for {}, but not finished ", 
transaction->getUUIDStr());
+        if (stream->waitForDataAvailable()) {
+          response.code = ResponseCode::CONTINUE_TRANSACTION;
+          return response;

Review Comment:
   This style:
   ```suggestion
             return SiteToSiteResponse{.code = 
ResponseCode::CONTINUE_TRANSACTION};
   ```
   would be more readable for me. As it is, with a `response` object created at 
the start and returned at various points, it's not clear whether the object has 
been modified earlier in the function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to