This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit d9a0415e4317c90a6aa54955a1256da795664f67
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Thu Oct 26 16:16:59 2023 +0200

    MINIFICPP-2210 Add C2 debug command to MiNiFi Controller
    
    Closes #1669
    
    Signed-off-by: Marton Szasz <[email protected]>
---
 controller/Controller.cpp                          | 41 +++++++++++++++
 controller/Controller.h                            |  2 +
 controller/MiNiFiController.cpp                    | 20 ++++++-
 controller/tests/ControllerTests.cpp               | 55 +++++++++++++++++++
 .../test/integration/cluster/DockerCommunicator.py | 17 ++++++
 .../test/integration/cluster/DockerTestCluster.py  | 41 ++++++++++++++-
 .../cluster/MinifiControllerExecutor.py            |  9 ++++
 .../features/MiNiFi_integration_test_driver.py     |  3 ++
 .../integration/features/minifi_controller.feature |  7 +++
 docker/test/integration/features/steps/steps.py    | 10 ++++
 libminifi/include/c2/C2Utils.h                     |  5 ++
 libminifi/include/c2/ControllerSocketProtocol.h    |  2 +
 libminifi/src/c2/C2Agent.cpp                       | 57 +++++++-------------
 libminifi/src/c2/C2Utils.cpp                       | 30 +++++++++++
 libminifi/src/c2/ControllerSocketProtocol.cpp      | 61 +++++++++++++++++++---
 15 files changed, 311 insertions(+), 49 deletions(-)

diff --git a/controller/Controller.cpp b/controller/Controller.cpp
index 627d2558d..411c58ac2 100644
--- a/controller/Controller.cpp
+++ b/controller/Controller.cpp
@@ -17,6 +17,7 @@
 #include "Controller.h"
 
 #include <utility>
+#include <fstream>
 
 #include "io/BufferStream.h"
 #include "c2/C2Payload.h"
@@ -26,6 +27,7 @@
 #include "asio/connect.hpp"
 #include "core/logging/Logger.h"
 #include "utils/net/AsioSocketUtils.h"
+#include "utils/file/FileUtils.h"
 
 namespace org::apache::nifi::minifi::controller {
 
@@ -225,4 +227,43 @@ bool getJstacks(const utils::net::SocketData& socket_data, 
std::ostream &out) {
   return true;
 }
 
+nonstd::expected<void, std::string> getDebugBundle(const 
utils::net::SocketData& socket_data, const std::filesystem::path& target_dir) {
+  std::unique_ptr<io::BaseStream> connection_stream = 
std::make_unique<utils::net::AsioSocketConnection>(socket_data);
+  if (connection_stream->initialize() < 0) {
+    return nonstd::make_unexpected("Could not connect to remote host " + 
socket_data.host + ":" + std::to_string(socket_data.port));
+  }
+  io::BufferStream buffer;
+  auto op = static_cast<uint8_t>(c2::Operation::transfer);
+  buffer.write(&op, 1);
+  buffer.write("debug");
+  if (io::isError(connection_stream->write(buffer.getBuffer()))) {
+    return nonstd::make_unexpected("Could not write to connection " + 
socket_data.host + ":" + std::to_string(socket_data.port));
+  }
+  connection_stream->read(op);
+  size_t bundle_size = 0;
+  connection_stream->read(bundle_size);
+  if (bundle_size == 0) {
+    return nonstd::make_unexpected("Failed to retrieve debug bundle");
+  }
+
+  if (std::filesystem::exists(target_dir) && 
!std::filesystem::is_directory(target_dir)) {
+    return nonstd::make_unexpected("Object specified as the target directory 
already exists and it is not a directory");
+  }
+
+  if (!std::filesystem::exists(target_dir) && 
utils::file::create_dir(target_dir) != 0) {
+    return nonstd::make_unexpected("Failed to create target directory: " + 
target_dir.string());
+  }
+
+  std::ofstream out_file(target_dir / "debug.tar.gz");
+  const size_t BUFFER_SIZE = 4096;
+  std::array<char, BUFFER_SIZE> out_buffer{};
+  while (bundle_size > 0) {
+    const auto next_read_size = (std::min)(bundle_size, BUFFER_SIZE);
+    const auto size_read = 
connection_stream->read(std::as_writable_bytes(std::span(out_buffer).subspan(0, 
next_read_size)));
+    bundle_size -= size_read;
+    out_file.write(out_buffer.data(), gsl::narrow<std::streamsize>(size_read));
+  }
+  return {};
+}
+
 }  // namespace org::apache::nifi::minifi::controller
diff --git a/controller/Controller.h b/controller/Controller.h
index dfc1661b6..aeb9e64c7 100644
--- a/controller/Controller.h
+++ b/controller/Controller.h
@@ -21,6 +21,7 @@
 #include <string>
 
 #include "utils/net/AsioSocketUtils.h"
+#include "utils/expected.h"
 
 namespace org::apache::nifi::minifi::controller {
 
@@ -35,5 +36,6 @@ bool listComponents(const utils::net::SocketData& 
socket_data, std::ostream &out
 bool listConnections(const utils::net::SocketData& socket_data, std::ostream 
&out, bool show_header = true);
 bool printManifest(const utils::net::SocketData& socket_data, std::ostream 
&out);
 bool getJstacks(const utils::net::SocketData& socket_data, std::ostream &out);
+nonstd::expected<void, std::string> getDebugBundle(const 
utils::net::SocketData& socket_data, const std::filesystem::path& target_dir);
 
 }  // namespace org::apache::nifi::minifi::controller
diff --git a/controller/MiNiFiController.cpp b/controller/MiNiFiController.cpp
index 3975c41f1..f574c807c 100644
--- a/controller/MiNiFiController.cpp
+++ b/controller/MiNiFiController.cpp
@@ -19,6 +19,7 @@
 #include <iostream>
 #include <string>
 #include <string_view>
+#include <filesystem>
 
 #include "MainHelper.h"
 #include "properties/Configure.h"
@@ -158,8 +159,16 @@ int main(int argc, char **argv) {
   addFlagOption("--manifest", "Generates a manifest for the current binary");
   addFlagOption("--noheaders", "Removes headers from output streams");
 
+  argument_parser.add_argument("-d", "--debug").metavar("BUNDLE_OUT_DIR")
+    .help("Get debug bundle");
+
   bool show_headers = true;
 
+  if (argc <= 1) {
+    std::cerr << argument_parser;
+    std::exit(1);
+  }
+
   try {
     argument_parser.parse_args(argc, argv);
   } catch (const std::runtime_error& err) {
@@ -254,11 +263,20 @@ int main(int argc, char **argv) {
       if (!minifi::controller::getJstacks(socket_data, std::cout))
         std::cout << "Could not connect to remote host " << socket_data.host 
<< ":" << socket_data.port << std::endl;
     }
+
+    if (const auto& debug_path = argument_parser.present("--debug")) {
+      auto debug_res = minifi::controller::getDebugBundle(socket_data, 
std::filesystem::path(*debug_path));
+      if (!debug_res)
+        std::cout << debug_res.error() << std::endl;
+      else
+        std::cout << "Debug bundle written to " << 
std::filesystem::path(*debug_path) / "debug.tar.gz";
+    }
   } catch (const std::exception &exc) {
     // catch anything thrown within try block that derives from std::exception
     std::cerr << exc.what() << std::endl;
+    std::exit(1);
   } catch (...) {
-    std::cerr << argument_parser;
+    std::cerr << "Caught unknown exception" << std::endl;
     std::exit(1);
   }
   return 0;
diff --git a/controller/tests/ControllerTests.cpp 
b/controller/tests/ControllerTests.cpp
index fdda4fbc7..a4907f225 100644
--- a/controller/tests/ControllerTests.cpp
+++ b/controller/tests/ControllerTests.cpp
@@ -20,6 +20,7 @@
 #include <utility>
 #include <string>
 #include <filesystem>
+#include <fstream>
 #include "range/v3/algorithm/find.hpp"
 
 #include "TestBase.h"
@@ -518,4 +519,58 @@ TEST_CASE_METHOD(ControllerTestFixture, "Test jstack 
getter", "[controllerTests]
   REQUIRE(jstack_stream.str() == expected_trace);
 }
 
+TEST_CASE_METHOD(ControllerTestFixture, "Test debug bundle getter", 
"[controllerTests]") {
+  SECTION("With SSL from service provider") {
+    
setConnectionType(ControllerTestFixture::ConnectionType::SSL_FROM_SERVICE_PROVIDER);
+  }
+
+  SECTION("With SSL from properties") {
+    
setConnectionType(ControllerTestFixture::ConnectionType::SSL_FROM_CONFIGURATION);
+  }
+
+  SECTION("Without SSL") {
+    setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE);
+  }
+
+  auto reporter = 
std::make_shared<minifi::c2::ControllerSocketMetricsPublisher>("ControllerSocketMetricsPublisher");
+  auto response_node_loader = 
std::make_shared<minifi::state::response::ResponseNodeLoader>(configuration_, 
std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
+  reporter->initialize(configuration_, response_node_loader);
+  initalizeControllerSocket(reporter);
+
+  TestController test_controller;
+  auto output_dir = test_controller.createTempDirectory();
+  REQUIRE(minifi::controller::getDebugBundle(controller_socket_data_, 
output_dir));
+  REQUIRE(std::filesystem::exists(output_dir / "debug.tar.gz"));
+}
+
+TEST_CASE_METHOD(ControllerTestFixture, "Test debug bundle is created to 
non-existent folder", "[controllerTests]") {
+  setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE);
+
+  auto reporter = 
std::make_shared<minifi::c2::ControllerSocketMetricsPublisher>("ControllerSocketMetricsPublisher");
+  auto response_node_loader = 
std::make_shared<minifi::state::response::ResponseNodeLoader>(configuration_, 
std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
+  reporter->initialize(configuration_, response_node_loader);
+  initalizeControllerSocket(reporter);
+
+  TestController test_controller;
+  auto output_dir = test_controller.createTempDirectory() / "subfolder";
+  REQUIRE(minifi::controller::getDebugBundle(controller_socket_data_, 
output_dir));
+  REQUIRE(std::filesystem::exists(output_dir / "debug.tar.gz"));
+}
+
+TEST_CASE_METHOD(ControllerTestFixture, "Debug bundle retrieval fails if 
target path is an existing file", "[controllerTests]") {
+  setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE);
+
+  auto reporter = 
std::make_shared<minifi::c2::ControllerSocketMetricsPublisher>("ControllerSocketMetricsPublisher");
+  auto response_node_loader = 
std::make_shared<minifi::state::response::ResponseNodeLoader>(configuration_, 
std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
+  reporter->initialize(configuration_, response_node_loader);
+  initalizeControllerSocket(reporter);
+
+  TestController test_controller;
+  auto invalid_path = test_controller.createTempDirectory() / "test.log";
+  std::ofstream file(invalid_path);
+  auto result = minifi::controller::getDebugBundle(controller_socket_data_, 
invalid_path);
+  REQUIRE(!result);
+  REQUIRE(result.error() == "Object specified as the target directory already 
exists and it is not a directory");
+}
+
 }  // namespace org::apache::nifi::minifi::test
diff --git a/docker/test/integration/cluster/DockerCommunicator.py 
b/docker/test/integration/cluster/DockerCommunicator.py
index 71acdd367..7d448cf34 100644
--- a/docker/test/integration/cluster/DockerCommunicator.py
+++ b/docker/test/integration/cluster/DockerCommunicator.py
@@ -19,6 +19,7 @@ import tempfile
 import tarfile
 import os
 import io
+import uuid
 
 
 class DockerCommunicator:
@@ -66,3 +67,19 @@ class DockerCommunicator:
                 tar.addfile(info, io.BytesIO(content.encode('utf-8')))
             with open(os.path.join(td, 'content.tar'), 'rb') as data:
                 return self.__put_archive(container_name, 
os.path.dirname(dst_path), data.read())
+
+    def copy_file_from_container(self, container_name, src_path_in_container, 
dest_dir_on_host) -> bool:
+        try:
+            container = self.client.containers.get(container_name)
+            (bits, _) = container.get_archive(src_path_in_container)
+            tmp_tar_path = os.path.join(dest_dir_on_host, "retrieved_file_" + 
str(uuid.uuid4()) + ".tar")
+            with open(tmp_tar_path, 'wb') as out_file:
+                for chunk in bits:
+                    out_file.write(chunk)
+            with tarfile.open(tmp_tar_path, 'r') as tar:
+                tar.extractall(dest_dir_on_host)
+            os.remove(tmp_tar_path)
+            return True
+        except Exception as ex:
+            logging.error('Exception occurred while copying file from 
container: %s', str(ex))
+            return False
diff --git a/docker/test/integration/cluster/DockerTestCluster.py 
b/docker/test/integration/cluster/DockerTestCluster.py
index e7146ed06..e850cc4bc 100644
--- a/docker/test/integration/cluster/DockerTestCluster.py
+++ b/docker/test/integration/cluster/DockerTestCluster.py
@@ -15,6 +15,11 @@
 import logging
 import time
 import re
+import tarfile
+import tempfile
+import os
+import gzip
+import shutil
 
 from .LogSource import LogSource
 from .ContainerStore import ContainerStore
@@ -340,7 +345,41 @@ class DockerTestCluster:
     def check_connection_size_through_controller(self, connection: str, size: 
int, max_size: int, container_name: str) -> bool:
         return self.minifi_controller_executor.get_connection_size(connection, 
container_name) == (size, max_size)
 
-    @retry_check(10, 1000)
+    @retry_check(10, 1)
     def manifest_can_be_retrieved_through_minifi_controller(self, 
container_name: str) -> bool:
         manifest = self.minifi_controller_executor.get_manifest(container_name)
         return '"agentManifest": {' in manifest and '"componentManifest": {' 
in manifest and '"agentType": "cpp"' in manifest
+
+    @retry_check(10, 1)
+    def debug_bundle_can_be_retrieved_through_minifi_controller(self, 
container_name: str) -> bool:
+        with tempfile.TemporaryDirectory() as td:
+            result = 
self.minifi_controller_executor.get_debug_bundle(container_name, td)
+            if not result:
+                logging.error("Failed to get debug bundle")
+                return False
+
+            with tarfile.open(os.path.join(td, "debug.tar.gz")) as file:
+                file.extractall(td)
+
+            if not os.path.exists(os.path.join(td, "config.yml")):
+                logging.error("config.yml file was not found in debug bundle")
+                return False
+
+            if not os.path.exists(os.path.join(td, "minifi.properties")):
+                logging.error("minifi.properties file was not found in debug 
bundle")
+                return False
+
+            if not os.path.exists(os.path.join(td, "minifi.log.gz")):
+                logging.error("minifi.log.gz file was not found in debug 
bundle")
+                return False
+
+            with gzip.open(os.path.join(td, "minifi.log.gz"), 'rb') as f_in:
+                with open(os.path.join(td, "minifi.log"), 'wb') as f_out:
+                    shutil.copyfileobj(f_in, f_out)
+
+            with open(os.path.join(td, "minifi.log")) as f:
+                if 'MiNiFi started' not in f.read():
+                    logging.error("'MiNiFi started' log entry was not found in 
minifi.log file")
+                    return False
+
+            return True
diff --git a/docker/test/integration/cluster/MinifiControllerExecutor.py 
b/docker/test/integration/cluster/MinifiControllerExecutor.py
index d7290df89..8da19835c 100644
--- a/docker/test/integration/cluster/MinifiControllerExecutor.py
+++ b/docker/test/integration/cluster/MinifiControllerExecutor.py
@@ -12,6 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
 from .DockerCommunicator import DockerCommunicator
 
 
@@ -66,3 +67,11 @@ class MinifiControllerExecutor:
             if not line.startswith('['):
                 manifest += line
         return manifest
+
+    def get_debug_bundle(self, container_name: str, dest: str) -> bool:
+        (code, _) = 
self.container_communicator.execute_command(container_name, 
["/opt/minifi/minifi-current/bin/minificontroller", "--debug", 
"/opt/minifi/minifi-current/"])
+        if code != 0:
+            logging.error("Minifi controller debug command failed with code: 
%d", code)
+            return False
+
+        return 
self.container_communicator.copy_file_from_container(container_name, 
"/opt/minifi/minifi-current/debug.tar.gz", dest)
diff --git a/docker/test/integration/features/MiNiFi_integration_test_driver.py 
b/docker/test/integration/features/MiNiFi_integration_test_driver.py
index 5dc82e64c..10c4eac3d 100644
--- a/docker/test/integration/features/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/features/MiNiFi_integration_test_driver.py
@@ -448,3 +448,6 @@ class MiNiFi_integration_test:
 
     def enable_log_metrics_publisher_in_minifi(self):
         self.cluster.enable_log_metrics_publisher_in_minifi()
+
+    def debug_bundle_can_be_retrieved_through_minifi_controller(self, 
container_name: str):
+        assert 
self.cluster.debug_bundle_can_be_retrieved_through_minifi_controller(container_name)
 or self.cluster.log_app_output()
diff --git a/docker/test/integration/features/minifi_controller.feature 
b/docker/test/integration/features/minifi_controller.feature
index 486cb1370..c78f91ca4 100644
--- a/docker/test/integration/features/minifi_controller.feature
+++ b/docker/test/integration/features/minifi_controller.feature
@@ -74,3 +74,10 @@ Feature: MiNiFi Controller functionalities
     And controller socket properties are set up
     When all instances start up
     Then manifest can be retrieved through MiNiFi controller
+
+  Scenario: Debug bundle can be retrieved
+    Given a GenerateFlowFile processor
+    And a file with the content "test" is present in "/tmp/input"
+    And controller socket properties are set up
+    When all instances start up
+    Then debug bundle can be retrieved through MiNiFi controller
diff --git a/docker/test/integration/features/steps/steps.py 
b/docker/test/integration/features/steps/steps.py
index 72bad48c8..dba4c75f7 100644
--- a/docker/test/integration/features/steps/steps.py
+++ b/docker/test/integration/features/steps/steps.py
@@ -1204,3 +1204,13 @@ def step_impl(context, minifi_container_name: str):
 @then(u'manifest can be retrieved through MiNiFi controller')
 def step_impl(context):
     context.execute_steps(f"then manifest can be retrieved through MiNiFi 
controller in the \"minifi-cpp-flow-{context.feature_id}\" flow")
+
+
+@then(u'debug bundle can be retrieved through MiNiFi controller in the 
\"{minifi_container_name}\" flow')
+def step_impl(context, minifi_container_name: str):
+    
context.test.debug_bundle_can_be_retrieved_through_minifi_controller(minifi_container_name)
+
+
+@then(u'debug bundle can be retrieved through MiNiFi controller')
+def step_impl(context):
+    context.execute_steps(f"then debug bundle can be retrieved through MiNiFi 
controller in the \"minifi-cpp-flow-{context.feature_id}\" flow")
diff --git a/libminifi/include/c2/C2Utils.h b/libminifi/include/c2/C2Utils.h
index ae879c043..f49b83b6c 100644
--- a/libminifi/include/c2/C2Utils.h
+++ b/libminifi/include/c2/C2Utils.h
@@ -20,9 +20,13 @@
 
 #include <string>
 #include <memory>
+#include <map>
 
 #include "properties/Configure.h"
 #include "utils/StringUtils.h"
+#include "io/ArchiveStream.h"
+#include "utils/expected.h"
+#include "io/BufferStream.h"
 
 namespace org::apache::nifi::minifi::c2 {
 
@@ -32,5 +36,6 @@ inline constexpr const char* 
CONTROLLER_SOCKET_METRICS_PUBLISHER = "ControllerSo
 
 bool isC2Enabled(const std::shared_ptr<Configure>& configuration);
 bool isControllerSocketEnabled(const std::shared_ptr<Configure>& 
configuration);
+nonstd::expected<std::shared_ptr<io::BufferStream>, std::string> 
createDebugBundleArchive(const std::map<std::string, 
std::unique_ptr<io::InputStream>>& files);
 
 }  // namespace org::apache::nifi::minifi::c2
diff --git a/libminifi/include/c2/ControllerSocketProtocol.h 
b/libminifi/include/c2/ControllerSocketProtocol.h
index f6045b4e2..93c279a2b 100644
--- a/libminifi/include/c2/ControllerSocketProtocol.h
+++ b/libminifi/include/c2/ControllerSocketProtocol.h
@@ -51,12 +51,14 @@ class ControllerSocketProtocol {
   void handleStop(io::BaseStream &stream);
   void handleClear(io::BaseStream &stream);
   void handleUpdate(io::BaseStream &stream);
+  void handleTransfer(io::BaseStream &stream);
   void writeQueueSizesResponse(io::BaseStream &stream);
   void writeComponentsResponse(io::BaseStream &stream);
   void writeConnectionsResponse(io::BaseStream &stream);
   void writeGetFullResponse(io::BaseStream &stream);
   void writeManifestResponse(io::BaseStream &stream);
   void writeJstackResponse(io::BaseStream &stream);
+  void writeDebugBundleResponse(io::BaseStream &stream);
   void handleDescribe(io::BaseStream &stream);
   asio::awaitable<void> handleCommand(std::unique_ptr<io::BaseStream> stream);
   asio::awaitable<void> handshakeAndHandleCommand(asio::ip::tcp::socket&& 
socket, std::shared_ptr<minifi::controllers::SSLContextService> 
ssl_context_service);
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index eed786167..3bd7bc0e5 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -44,6 +44,7 @@
 #include "io/ArchiveStream.h"
 #include "io/StreamPipe.h"
 #include "utils/Id.h"
+#include "c2/C2Utils.h"
 
 using namespace std::literals::chrono_literals;
 
@@ -356,7 +357,7 @@ void C2Agent::handle_c2_server_response(const 
C2ContentResponse &resp) {
     case Operation::start:
     case Operation::stop: {
       if (resp.name == "C2" || resp.name == "c2") {
-        raise(SIGTERM);
+        (void)raise(SIGTERM);
       }
 
       // stop all referenced components.
@@ -431,7 +432,7 @@ C2Payload C2Agent::prepareConfigurationOptions(const 
C2ContentResponse &resp) co
 }
 
 void C2Agent::handle_clear(const C2ContentResponse &resp) {
-  ClearOperand operand;
+  ClearOperand operand = ClearOperand::connection;
   try {
     operand = utils::enumCast<ClearOperand>(resp.name, true);
   } catch(const std::runtime_error&) {
@@ -486,7 +487,7 @@ void C2Agent::handle_clear(const C2ContentResponse &resp) {
  * to be put into the acknowledgement
  */
 void C2Agent::handle_describe(const C2ContentResponse &resp) {
-  DescribeOperand operand;
+  DescribeOperand operand = DescribeOperand::metrics;
   try {
     operand = utils::enumCast<DescribeOperand>(resp.name, true);
   } catch(const std::runtime_error&) {
@@ -588,7 +589,7 @@ void C2Agent::handle_describe(const C2ContentResponse 
&resp) {
 }
 
 void C2Agent::handle_update(const C2ContentResponse &resp) {
-  UpdateOperand operand;
+  UpdateOperand operand = UpdateOperand::configuration;
   try {
     operand = utils::enumCast<UpdateOperand>(resp.name, true);
   } catch(const std::runtime_error&) {
@@ -665,56 +666,34 @@ C2Agent::UpdateResult C2Agent::update_property(const 
std::string &property_name,
 }
 
 C2Payload C2Agent::bundleDebugInfo(std::map<std::string, 
std::unique_ptr<io::InputStream>>& files) {
-  C2Payload payload(Operation::transfer, false);
-  auto stream_provider = 
core::ClassLoader::getDefaultClassLoader().instantiate<io::ArchiveStreamProvider>(
-      "ArchiveStreamProvider", "ArchiveStreamProvider");
-  if (!stream_provider) {
-    throw C2DebugBundleError("Couldn't instantiate archiver provider");
-  }
-  auto bundle = std::make_shared<io::BufferStream>();
-  auto archiver = stream_provider->createWriteStream(9, "gzip", bundle, 
logger_);
-  if (!archiver) {
-    throw C2DebugBundleError("Couldn't instantiate archiver");
-  }
-  for (auto& [filename, stream] : files) {
-    size_t file_size = stream->size();
-    if (!archiver->newEntry({filename, file_size})) {
-      throw C2DebugBundleError("Couldn't initialize archive entry for '" + 
filename + "'");
-    }
-    if (gsl::narrow<int64_t>(file_size) != internal::pipe(*stream, *archiver)) 
{
-      // we have touched the input streams, they cannot be reused
-      throw C2DebugBundleError("Error while writing file '" + filename + "' 
into the debug bundle");
-    }
-  }
+  static constexpr const char* MANIFEST_FILE_NAME = "manifest.json";
+  auto manifest_stream = std::make_unique<io::BufferStream>();
   if (auto node_reporter = node_reporter_.lock()) {
-    static constexpr const char* MANIFEST_FILE_NAME = "manifest.json";
     auto reported_manifest = node_reporter->getAgentManifest();
     std::string manifest_str = state::response::SerializedResponseNode{
       .name = std::move(reported_manifest.name),
       .array = reported_manifest.is_array,
       .children = std::move(reported_manifest.serialized_nodes)
     }.to_pretty_string();
-    if (!archiver->newEntry({MANIFEST_FILE_NAME, manifest_str.size()})) {
-      throw C2DebugBundleError(fmt::format("Couldn't initialize archive entry 
for '{}'", MANIFEST_FILE_NAME));
-    }
-    io::BufferStream manifest_stream;
-    manifest_stream.write(as_bytes(std::span(manifest_str)));
-    if (gsl::narrow<int64_t>(manifest_stream.size()) != 
internal::pipe(manifest_stream, *archiver)) {
-      throw C2DebugBundleError(fmt::format("Error while writing file '{}'", 
MANIFEST_FILE_NAME));
-    }
+    manifest_stream->write(as_bytes(std::span(manifest_str)));
   }
-  if (!archiver->finish()) {
-    throw C2DebugBundleError("Failed to complete debug bundle archive");
+  files[MANIFEST_FILE_NAME] = std::move(manifest_stream);
+
+  auto bundle = createDebugBundleArchive(files);
+  if (!bundle) {
+    throw C2DebugBundleError(bundle.error());
   }
+
   C2Payload file(Operation::transfer, true);
   file.setLabel("debug.tar.gz");
-  file.setRawData(bundle->moveBuffer());
+  file.setRawData(bundle.value()->moveBuffer());
+  C2Payload payload(Operation::transfer, false);
   payload.addPayload(std::move(file));
   return payload;
 }
 
 void C2Agent::handle_transfer(const C2ContentResponse &resp) {
-  TransferOperand operand;
+  TransferOperand operand = TransferOperand::debug;
   try {
     operand = utils::enumCast<TransferOperand>(resp.name, true);
   } catch(const std::runtime_error&) {
@@ -1030,7 +1009,7 @@ void C2Agent::handleAssetUpdate(const C2ContentResponse& 
resp) {
 
   {
     std::ofstream file{file_path, std::ofstream::binary};
-    file.write(reinterpret_cast<const char*>(raw_data.data()), 
raw_data.size());
+    file.write(reinterpret_cast<const char*>(raw_data.data()), 
gsl::narrow<std::streamsize>(raw_data.size()));
   }
 
   C2Payload response(Operation::acknowledge, 
state::UpdateState::FULLY_APPLIED, resp.ident, true);
diff --git a/libminifi/src/c2/C2Utils.cpp b/libminifi/src/c2/C2Utils.cpp
index 24b71fc8f..ae22fb07a 100644
--- a/libminifi/src/c2/C2Utils.cpp
+++ b/libminifi/src/c2/C2Utils.cpp
@@ -17,6 +17,9 @@
  */
 #include "c2/C2Utils.h"
 
+#include "core/ClassLoader.h"
+#include "io/StreamPipe.h"
+
 namespace org::apache::nifi::minifi::c2 {
 
 bool isC2Enabled(const std::shared_ptr<Configure>& configuration) {
@@ -31,4 +34,31 @@ bool isControllerSocketEnabled(const 
std::shared_ptr<Configure>& configuration)
   return 
utils::StringUtils::toBool(controller_socket_enable_str).value_or(false);
 }
 
+nonstd::expected<std::shared_ptr<io::BufferStream>, std::string> 
createDebugBundleArchive(const std::map<std::string, 
std::unique_ptr<io::InputStream>>& files) {
+  auto stream_provider = 
core::ClassLoader::getDefaultClassLoader().instantiate<io::ArchiveStreamProvider>(
+      "ArchiveStreamProvider", "ArchiveStreamProvider");
+  if (!stream_provider) {
+    return nonstd::make_unexpected("Couldn't instantiate archiver provider");
+  }
+  auto bundle = std::make_shared<io::BufferStream>();
+  auto archiver = stream_provider->createWriteStream(9, "gzip", bundle, 
nullptr);
+  if (!archiver) {
+    return nonstd::make_unexpected("Couldn't instantiate archiver");
+  }
+  for (const auto& [filename, stream] : files) {
+    size_t file_size = stream->size();
+    if (!archiver->newEntry({filename, file_size})) {
+      return nonstd::make_unexpected("Couldn't initialize archive entry for '" 
+ filename + "'");
+    }
+    if (gsl::narrow<int64_t>(file_size) != minifi::internal::pipe(*stream, 
*archiver)) {
+      // we have touched the input streams, they cannot be reused
+      return nonstd::make_unexpected("Error while writing file '" + filename + 
"' into the debug bundle");
+    }
+  }
+  if (!archiver->finish()) {
+    return nonstd::make_unexpected("Failed to complete debug bundle archive");
+  }
+  return bundle;
+}
+
 }  // namespace org::apache::nifi::minifi::c2
diff --git a/libminifi/src/c2/ControllerSocketProtocol.cpp 
b/libminifi/src/c2/ControllerSocketProtocol.cpp
index e122da1d5..ea75df0e4 100644
--- a/libminifi/src/c2/ControllerSocketProtocol.cpp
+++ b/libminifi/src/c2/ControllerSocketProtocol.cpp
@@ -31,6 +31,7 @@
 #include "asio/ssl/stream.hpp"
 #include "asio/detached.hpp"
 #include "utils/net/AsioSocketUtils.h"
+#include "c2/C2Utils.h"
 
 namespace org::apache::nifi::minifi::c2 {
 
@@ -185,7 +186,7 @@ void ControllerSocketProtocol::handleStart(io::BaseStream 
&stream) {
       });
     }
   } else {
-    logger_->log_debug("Connection broke");
+    logger_->log_error("Connection broke");
   }
 }
 
@@ -197,7 +198,7 @@ void ControllerSocketProtocol::handleStop(io::BaseStream 
&stream) {
       component.stop();
     });
   } else {
-    logger_->log_debug("Connection broke");
+    logger_->log_error("Connection broke");
   }
 }
 
@@ -214,7 +215,7 @@ void ControllerSocketProtocol::handleUpdate(io::BaseStream 
&stream) {
   {
     const auto size = stream.read(what);
     if (io::isError(size)) {
-      logger_->log_debug("Connection broke");
+      logger_->log_error("Connection broke");
       return;
     }
   }
@@ -223,7 +224,7 @@ void ControllerSocketProtocol::handleUpdate(io::BaseStream 
&stream) {
     {
       const auto size = stream.read(ff_loc);
       if (io::isError(size)) {
-        logger_->log_debug("Connection broke");
+        logger_->log_error("Connection broke");
         return;
       }
     }
@@ -238,7 +239,7 @@ void 
ControllerSocketProtocol::writeQueueSizesResponse(io::BaseStream &stream) {
   std::string connection;
   const auto size_ = stream.read(connection);
   if (io::isError(size_)) {
-    logger_->log_debug("Connection broke");
+    logger_->log_error("Connection broke");
     return;
   }
   std::unordered_map<std::string, ControllerSocketReporter::QueueSize> sizes;
@@ -351,7 +352,7 @@ void 
ControllerSocketProtocol::handleDescribe(io::BaseStream &stream) {
   std::string what;
   const auto size = stream.read(what);
   if (io::isError(size)) {
-    logger_->log_debug("Connection broke");
+    logger_->log_error("Connection broke");
     return;
   }
   if (what == "queue") {
@@ -371,10 +372,51 @@ void 
ControllerSocketProtocol::handleDescribe(io::BaseStream &stream) {
   }
 }
 
+void ControllerSocketProtocol::writeDebugBundleResponse(io::BaseStream 
&stream) {
+  auto files = update_sink_.getDebugInfo();
+  auto bundle = createDebugBundleArchive(files);
+  io::BufferStream resp;
+  auto op = static_cast<uint8_t>(Operation::transfer);
+  resp.write(&op, 1);
+  if (!bundle) {
+    logger_->log_error("Creating debug bundle failed: {}", bundle.error());
+    resp.write(static_cast<size_t>(0));
+    stream.write(resp.getBuffer());
+    return;
+  }
+
+  size_t bundle_size = bundle.value()->size();
+  resp.write(bundle_size);
+  const size_t BUFFER_SIZE = 4096;
+  std::array<std::byte, BUFFER_SIZE> out_buffer{};
+  while (bundle_size > 0) {
+    const auto next_write_size = (std::min)(bundle_size, BUFFER_SIZE);
+    const auto size_read = 
bundle.value()->read(std::as_writable_bytes(std::span(out_buffer).subspan(0, 
next_write_size)));
+    resp.write(reinterpret_cast<const uint8_t*>(out_buffer.data()), size_read);
+    bundle_size -= size_read;
+  }
+
+  stream.write(resp.getBuffer());
+}
+
+void ControllerSocketProtocol::handleTransfer(io::BaseStream &stream) {
+  std::string what;
+  const auto size = stream.read(what);
+  if (io::isError(size)) {
+    logger_->log_error("Connection broke");
+    return;
+  }
+  if (what == "debug") {
+    writeDebugBundleResponse(stream);
+  } else {
+    logger_->log_error("Unknown C2 transfer parameter: {}", what);
+  }
+}
+
 asio::awaitable<void> 
ControllerSocketProtocol::handleCommand(std::unique_ptr<io::BaseStream> stream) 
{
-  uint8_t head;
+  uint8_t head = 0;
   if (stream->read(head) != 1) {
-    logger_->log_debug("Connection broke");
+    logger_->log_error("Connection broke");
     co_return;
   }
 
@@ -400,6 +442,9 @@ asio::awaitable<void> 
ControllerSocketProtocol::handleCommand(std::unique_ptr<io
     case Operation::describe:
       handleDescribe(*stream);
       break;
+    case Operation::transfer:
+      handleTransfer(*stream);
+      break;
     default:
       logger_->log_error("Unhandled C2 operation: {}", head);
   }

Reply via email to