szaszm commented on code in PR #1332:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1332#discussion_r876454396


##########
libminifi/src/c2/C2Agent.cpp:
##########
@@ -897,6 +901,113 @@ bool C2Agent::handleConfigurationUpdate(const 
C2ContentResponse &resp) {
   return true;
 }
 
+static auto make_path(const std::string& str) {
+  return std::filesystem::path(str);
+}
+
+static std::optional<std::string> validateFilePath(const 
std::filesystem::path& path) {
+  if (path.empty()) {
+    return "Empty file path";
+  }
+  if (!path.is_relative()) {
+    return "File path must be a relative path '" + path.string() + "'";
+  }
+  if (!path.has_filename()) {
+    return "Filename missing in output path '" + path.string() + "'";
+  }
+  if (path.filename() == "." || path.filename() == "..") {
+    return "Invalid filename '" + path.filename().string() + "'";
+  }
+  for (const auto& segment : path) {
+    if (segment == "..") {
+      return "Accessing parent directory is forbidden in file path '" + 
path.string() + "'";
+    }
+  }
+  return std::nullopt;
+}
+
+void C2Agent::handleAssetUpdate(const C2ContentResponse& resp) {
+  auto send_error = [&] (std::string_view error) {
+    logger_->log_error("%s", std::string(error));
+    C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::SET_ERROR, 
resp.ident, true);
+    response.setRawData(gsl::span<const char>(error).as_span<const 
std::byte>());
+    enqueue_c2_response(std::move(response));
+  };
+  std::filesystem::path asset_dir = 
std::filesystem::path(configuration_->getHome()) / "asset";
+  if (auto asset_dir_str = 
configuration_->get(Configuration::nifi_asset_directory)) {
+    asset_dir = asset_dir_str.value();
+  }
+
+  // output file
+  std::filesystem::path file_path;
+  if (auto file_rel = resp.getArgument("file") | utils::map(make_path)) {
+    if (auto error = validateFilePath(file_rel.value())) {
+      send_error(error.value());
+      return;
+    }
+    file_path = asset_dir / file_rel.value();
+  } else {
+    send_error("Couldn't find 'file' argument");
+    return;
+  }
+
+  // source url
+  std::string url;
+  if (auto url_str = resp.getArgument("url")) {
+    if (auto resolved_url = resolveUrl(*url_str)) {
+      url = resolved_url.value();
+    } else {
+      send_error("Couldn't resolve url");
+      return;
+    }
+  } else {
+    send_error("Couldn't find 'url' argument");
+    return;
+  }
+
+  // forceDownload
+  bool force_download = false;
+  if (auto force_download_str = resp.getArgument("forceDownload")) {
+    if (utils::StringUtils::equalsIgnoreCase(force_download_str.value(), 
"true")) {
+      force_download = true;
+    } else if 
(utils::StringUtils::equalsIgnoreCase(force_download_str.value(), "false")) {
+      force_download = false;
+    } else {
+      send_error("Argument 'forceDownload' must be either 'true' or 'false'");
+      return;
+    }
+  }
+
+  if (!force_download && std::filesystem::exists(file_path)) {
+    logger_->log_info("File already exists");
+    C2Payload response(Operation::ACKNOWLEDGE, 
state::UpdateState::NO_OPERATION, resp.ident, true);
+    enqueue_c2_response(std::move(response));
+    return;
+  }
+
+  C2Payload&& file_response = protocol_.load()->consumePayload(url, 
C2Payload(Operation::TRANSFER, true), RECEIVE, false);

Review Comment:
   I know this is a pattern in this part of the codebase, but can we avoid 
relying on reference lifetime extension in new code when we can just as easily 
just make a proper object?
   ```suggestion
     C2Payload file_response = protocol_.load()->consumePayload(url, 
C2Payload(Operation::TRANSFER, true), RECEIVE, false);
   ```



##########
extensions/http-curl/tests/C2UpdateAssetTest.cpp:
##########
@@ -0,0 +1,260 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#include <vector>
+#include <string>
+#include <fstream>
+#include <iterator>
+
+#include "HTTPIntegrationBase.h"
+#include "HTTPHandlers.h"
+#include "utils/IntegrationTestUtils.h"
+#include "utils/Environment.h"
+
+class FileProvider : public ServerAwareHandler {
+ public:
+  explicit FileProvider(std::string file_content): 
file_content_(std::move(file_content)) {}
+
+  bool handleGet(CivetServer* /*server*/, struct mg_connection* conn) override 
{
+    mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+                    "text/plain\r\nContent-Length: %lu\r\nConnection: 
close\r\n\r\n",
+              file_content_.length());
+    mg_printf(conn, "%s", file_content_.c_str());
+    return true;
+  }
+
+ private:
+  std::string file_content_;
+};
+
+class C2HeartbeatHandler : public HeartbeatHandler {
+ public:
+  using HeartbeatHandler::HeartbeatHandler;
+
+  bool handlePost(CivetServer* /*server*/, struct mg_connection* conn) 
override {
+    std::lock_guard<std::mutex> guard(op_mtx_);
+    sendHeartbeatResponse(operations_, conn);
+    operations_.clear();
+    return true;
+  }
+
+  void addOperation(std::string id, std::unordered_map<std::string, 
std::string> args) {
+    std::lock_guard<std::mutex> guard(op_mtx_);
+    operations_.push_back(C2Operation{
+      .operation = "update",
+      .operand = "asset",
+      .operation_id = std::move(id),
+      .args = std::move(args)
+    });
+  }
+
+ private:
+  std::mutex op_mtx_;
+  std::vector<C2Operation> operations_;
+};
+
+class VerifyC2AssetUpdate : public VerifyC2Base {
+ public:
+  void configureC2() override {
+    configuration->set("nifi.c2.agent.protocol.class", "RESTSender");
+    configuration->set("nifi.c2.enable", "true");
+    configuration->set("nifi.c2.agent.heartbeat.period", "100");

Review Comment:
   Consider slightly increasing the heartbeat period. The C2Agent consumer poll 
rate (C2Agent.h `C2RESPONSE_POLL_MS`) is once every 100ms, and it can happen 
that the consumer can't keep up with the producer, introducing some latency. I 
had this issue with the restart after property update, that never happened, 
because there were always tasks left to do in the consumer queue.
   It wouldn't break the test here, but I'd like to avoid this problem if 
possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to