fgerlits commented on code in PR #1497:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1497#discussion_r1107133908


##########
libminifi/include/c2/C2MetricsPublisher.h:
##########
@@ -24,69 +24,53 @@
 #include <optional>
 #include <string>
 #include <vector>
-#include <unordered_set>
+#include <functional>
 
 #include "c2/C2Agent.h"
 #include "core/controller/ControllerServiceProvider.h"
 #include "properties/Configure.h"
 #include "core/logging/Logger.h"
 #include "core/state/nodes/MetricsBase.h"
-#include "core/Repository.h"
-#include "core/ContentRepository.h"
 #include "core/ProcessGroup.h"
-#include "core/Flow.h"
+#include "core/Core.h"
 #include "utils/file/FileSystem.h"
 #include "core/state/nodes/ResponseNodeLoader.h"
 #include "utils/Id.h"
+#include "core/state/MetricsPublisher.h"
 
 namespace org::apache::nifi::minifi::c2 {
 
-class C2Client : public core::Flow, public state::response::NodeReporter {
+class C2MetricsPublisher : public state::response::NodeReporter, public 
state::MetricsPublisher {
  public:
-  C2Client(
-      std::shared_ptr<Configure> configuration, 
std::shared_ptr<core::Repository> provenance_repo,
-      std::shared_ptr<core::Repository> flow_file_repo, 
std::shared_ptr<core::ContentRepository> content_repo,
-      std::unique_ptr<core::FlowConfiguration> flow_configuration, 
std::shared_ptr<utils::file::FileSystem> filesystem,
-      std::function<void()> request_restart,
-      std::shared_ptr<core::logging::Logger> logger = 
core::logging::LoggerFactory<C2Client>::getLogger());
+  using MetricsPublisher::MetricsPublisher;
+
+  MINIFIAPI static constexpr const char* Description = "Class that provides C2 
metrics to the C2Agent";
 
-  void initialize(core::controller::ControllerServiceProvider *controller, 
state::Pausable *pause_handler, state::StateMonitor* update_sink);
   std::optional<state::response::NodeReporter::ReportedNode> 
getMetricsNode(const std::string& metrics_class) const override;
   std::vector<state::response::NodeReporter::ReportedNode> 
getHeartbeatNodes(bool include_manifest) const override;
 
-  void stopC2();
-  void initializeResponseNodes(core::ProcessGroup* root);
-  void clearResponseNodes();
+  /**
+   * Retrieves the agent manifest to be sent as a response to C2 DESCRIBE 
manifest
+   * @return the agent manifest response node
+   */

Review Comment:
   this comment is the same as in the parent class, so I think it could be 
removed



##########
libminifi/src/core/state/MetricsPublisherFactory.cpp:
##########
@@ -0,0 +1,45 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "core/state/MetricsPublisherFactory.h"
+
+namespace org::apache::nifi::minifi::state {
+
+std::unique_ptr<MetricsPublisher> createMetricsPublisher(const std::string& 
name, const std::shared_ptr<Configure>& configuration,
+    const std::shared_ptr<state::response::ResponseNodeLoader>& 
response_node_loader) {
+  auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(name, 
name);
+  if (!ptr) {
+    throw std::runtime_error("Configured metrics publisher class \"" + name + 
"\" could not be instantiated.");
+  }
+
+  auto metrics_publisher = 
utils::dynamic_unique_cast<MetricsPublisher>(std::move(ptr));
+  if (!metrics_publisher) {
+    throw std::runtime_error("Configured metrics publisher class \"" + name + 
"\" could not be instantiated.");

Review Comment:
   it would be better to have different exception messages in the two cases, eg:
   ```suggestion
       throw std::runtime_error("Configured metrics publisher class \"" + name 
+ "\" is not a MetricsPublisher.");
   ```



##########
libminifi/src/core/state/MetricsPublisherFactory.cpp:
##########
@@ -0,0 +1,45 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "core/state/MetricsPublisherFactory.h"
+
+namespace org::apache::nifi::minifi::state {
+
+std::unique_ptr<MetricsPublisher> createMetricsPublisher(const std::string& 
name, const std::shared_ptr<Configure>& configuration,

Review Comment:
   These two overloads are different: the second can return null, the first 
can't.  In `MetricsPublisherStore::initialize`, we check the return value for 
nullness, which is not necessary.  We could change the return value of this 
overload to `gsl::not_null<...>` so the caller can see if a nullness check is 
needed.



##########
libminifi/include/c2/C2Utils.h:
##########
@@ -0,0 +1,34 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <memory>
+
+#include "properties/Configure.h"
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::c2 {
+
+static constexpr const char* UPDATE_NAME = "C2UpdatePolicy";
+static constexpr const char* C2_METRICS_PUBLISHER = "C2MetricsPublisher";

Review Comment:
   Nitpicking, but `static` is not needed at namespace scope, as it is implied 
by `constexpr`.  In fact, we may want to do the opposite and declare these 
`inline constexpr ...`, so the linker will keep one copy only, instead of 
creating a separate copy each time this header is included in a cpp file.



##########
libminifi/src/core/state/nodes/ResponseNodeLoader.cpp:
##########
@@ -25,89 +25,129 @@
 #include "core/state/nodes/QueueMetrics.h"
 #include "core/state/nodes/AgentInformation.h"
 #include "core/state/nodes/ConfigurationChecksums.h"
-#include "c2/C2Agent.h"
 #include "utils/gsl.h"
 #include "utils/RegexUtils.h"
 #include "utils/StringUtils.h"
+#include "c2/C2Utils.h"
 
 namespace org::apache::nifi::minifi::state::response {
 
 ResponseNodeLoader::ResponseNodeLoader(std::shared_ptr<Configure> 
configuration, std::shared_ptr<core::Repository> provenance_repo,
-    std::shared_ptr<core::Repository> flow_file_repo, core::FlowConfiguration* 
flow_configuration)
+    std::shared_ptr<core::Repository> flow_file_repo, 
std::shared_ptr<core::FlowConfiguration> flow_configuration)
   : configuration_(std::move(configuration)),
     provenance_repo_(std::move(provenance_repo)),
     flow_file_repo_(std::move(flow_file_repo)),
-    flow_configuration_(flow_configuration) {
+    flow_configuration_(std::move(flow_configuration)) {
 }
 
-void ResponseNodeLoader::initializeComponentMetrics(core::ProcessGroup* root) {
+void ResponseNodeLoader::clearConfigRoot() {
+  {
+    std::lock_guard<std::mutex> guard(system_metrics_mutex_);
+    system_metrics_.clear();
+  }
+  {
+    std::lock_guard<std::mutex> guard(component_metrics_mutex_);
+    component_metrics_.clear();
+  }
+  {
+    std::lock_guard<std::mutex> guard(root_mutex_);
+    root_ = nullptr;
+  }
+}
+
+void ResponseNodeLoader::setNewConfigRoot(core::ProcessGroup* root) {
+  {
+    std::lock_guard<std::mutex> guard(root_mutex_);
+    root_ = root;
+  }
+  initializeComponentMetrics();
+}
+
+void ResponseNodeLoader::initializeComponentMetrics() {
   {
     std::lock_guard<std::mutex> guard(component_metrics_mutex_);
     component_metrics_.clear();
   }
 
-  if (!root) {
+  std::lock_guard<std::mutex> guard(root_mutex_);
+  if (!root_) {
     return;
   }
 
   std::vector<core::Processor*> processors;
-  root->getAllProcessors(processors);
+  root_->getAllProcessors(processors);
   for (const auto processor : processors) {
     auto node_source = dynamic_cast<ResponseNodeSource*>(processor);
     if (node_source == nullptr) {
       continue;
     }
     // we have a metrics source.
-    auto metric = node_source->getResponseNodes();
+    auto metric = node_source->getResponseNode();
     std::lock_guard<std::mutex> guard(component_metrics_mutex_);
     component_metrics_[metric->getName()].push_back(metric);
   }
 }
 
-std::vector<std::shared_ptr<ResponseNode>> 
ResponseNodeLoader::getResponseNodes(const std::string& clazz) const {
-  std::shared_ptr<core::CoreComponent> ptr = 
core::ClassLoader::getDefaultClassLoader().instantiate(clazz, clazz);
-  if (ptr == nullptr) {
-    return getComponentMetricsNodes(clazz);
+nonstd::expected<SharedResponseNode, std::string> 
ResponseNodeLoader::getSystemMetricsNode(const std::string& clazz) {
+  std::lock_guard<std::mutex> guard(system_metrics_mutex_);
+  if (system_metrics_.contains(clazz)) {
+    return system_metrics_.at(clazz);
   }
+
+  std::shared_ptr ptr = 
core::ClassLoader::getDefaultClassLoader().instantiate(clazz, clazz);
   auto response_node = std::dynamic_pointer_cast<ResponseNode>(ptr);
   if (!response_node) {
-    logger_->log_error("Instantiated class '%s' is not a ResponseNode!", 
clazz);
+    return nonstd::make_unexpected("Instantiated class '" + clazz + "' is not 
a ResponseNode!");
+  }
+  system_metrics_.emplace(clazz, gsl::make_not_null(response_node));
+  return system_metrics_.at(clazz);
+}
+
+std::vector<SharedResponseNode> ResponseNodeLoader::getResponseNodes(const 
std::string& clazz) {
+  auto component_metrics = getComponentMetricsNodes(clazz);
+  if (!component_metrics.empty()) {
+    return component_metrics;
+  }
+  auto response_node = getSystemMetricsNode(clazz);
+  if (!response_node) {
+    logger_->log_error(response_node.error().c_str());
     return {};
   }
-  return {response_node};
+  return {*response_node};
 }
 
-void ResponseNodeLoader::initializeRepositoryMetrics(const 
std::shared_ptr<ResponseNode>& response_node) const {
+void ResponseNodeLoader::initializeRepositoryMetrics(const SharedResponseNode& 
response_node) const {
   auto repository_metrics = 
dynamic_cast<RepositoryMetrics*>(response_node.get());
   if (repository_metrics != nullptr) {
     repository_metrics->addRepository(provenance_repo_);
     repository_metrics->addRepository(flow_file_repo_);
   }
 }
 
-void ResponseNodeLoader::initializeQueueMetrics(const 
std::shared_ptr<ResponseNode>& response_node, core::ProcessGroup* root) {
-  if (!root) {
+void ResponseNodeLoader::initializeQueueMetrics(const SharedResponseNode& 
response_node) const {
+  std::lock_guard<std::mutex> guard(root_mutex_);

Review Comment:
   I would reduce the scope of this lock to just the `getConnections()` call, 
too.



##########
libminifi/include/FlowController.h:
##########
@@ -216,27 +185,29 @@ class FlowController : public 
core::controller::ForwardingControllerServiceProvi
   std::unique_ptr<FlowControlProtocol> protocol_;
   std::chrono::steady_clock::time_point start_time_;
 
- private:
+ protected:

Review Comment:
   could this be private?



##########
libminifi/src/core/state/nodes/ResponseNodeLoader.cpp:
##########
@@ -25,89 +25,129 @@
 #include "core/state/nodes/QueueMetrics.h"
 #include "core/state/nodes/AgentInformation.h"
 #include "core/state/nodes/ConfigurationChecksums.h"
-#include "c2/C2Agent.h"
 #include "utils/gsl.h"
 #include "utils/RegexUtils.h"
 #include "utils/StringUtils.h"
+#include "c2/C2Utils.h"
 
 namespace org::apache::nifi::minifi::state::response {
 
 ResponseNodeLoader::ResponseNodeLoader(std::shared_ptr<Configure> 
configuration, std::shared_ptr<core::Repository> provenance_repo,
-    std::shared_ptr<core::Repository> flow_file_repo, core::FlowConfiguration* 
flow_configuration)
+    std::shared_ptr<core::Repository> flow_file_repo, 
std::shared_ptr<core::FlowConfiguration> flow_configuration)
   : configuration_(std::move(configuration)),
     provenance_repo_(std::move(provenance_repo)),
     flow_file_repo_(std::move(flow_file_repo)),
-    flow_configuration_(flow_configuration) {
+    flow_configuration_(std::move(flow_configuration)) {
 }
 
-void ResponseNodeLoader::initializeComponentMetrics(core::ProcessGroup* root) {
+void ResponseNodeLoader::clearConfigRoot() {
+  {
+    std::lock_guard<std::mutex> guard(system_metrics_mutex_);
+    system_metrics_.clear();
+  }
+  {
+    std::lock_guard<std::mutex> guard(component_metrics_mutex_);
+    component_metrics_.clear();
+  }
+  {
+    std::lock_guard<std::mutex> guard(root_mutex_);
+    root_ = nullptr;
+  }
+}
+
+void ResponseNodeLoader::setNewConfigRoot(core::ProcessGroup* root) {
+  {
+    std::lock_guard<std::mutex> guard(root_mutex_);
+    root_ = root;
+  }
+  initializeComponentMetrics();
+}
+
+void ResponseNodeLoader::initializeComponentMetrics() {
   {
     std::lock_guard<std::mutex> guard(component_metrics_mutex_);
     component_metrics_.clear();
   }
 
-  if (!root) {
+  std::lock_guard<std::mutex> guard(root_mutex_);
+  if (!root_) {
     return;
   }
 
   std::vector<core::Processor*> processors;
-  root->getAllProcessors(processors);
+  root_->getAllProcessors(processors);

Review Comment:
   could we reduce the scope of this lock?  e.g:
   ```c++
     std::vector<core::Processor*> processors;
     {
       std::lock_guard<std::mutex> guard(root_mutex_);
       if (!root_) {
         return;
       }
       root_->getAllProcessors(processors);
     }
   ```



##########
libminifi/test/flow-tests/TestControllerWithFlow.h:
##########
@@ -64,14 +65,12 @@ class TestControllerWithFlow: public TestController {
     REQUIRE(content_repo->initialize(configuration_));
     std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
minifi::io::StreamFactory::getInstance(configuration_);
 
-    auto flow = 
std::make_unique<core::YamlConfiguration>(core::ConfigurationContext{ff_repo, 
content_repo, stream_factory, configuration_, yaml_path_.string()});
+    auto flow = 
std::make_shared<core::YamlConfiguration>(core::ConfigurationContext{ff_repo, 
content_repo, stream_factory, configuration_, yaml_path_.string()});
     auto root = flow->getRoot();
     root_ = root.get();
-    controller_ = std::make_shared<minifi::FlowController>(
-        prov_repo, ff_repo, configuration_,
-        std::move(flow),
-        content_repo, DEFAULT_ROOT_GROUP_NAME,
-        std::make_shared<utils::file::FileSystem>(), []{});
+    auto metrics_publisher_store = 
std::make_unique<minifi::state::MetricsPublisherStore>(configuration_, 
prov_repo, ff_repo, flow);
+    metrics_publisher_store_ = metrics_publisher_store.get();

Review Comment:
   Why do we save `metrics_publisher_store_`?  I couldn't find any place where 
we use it.  Also, saving a bare pointer to an object owned by someone else 
looks dangerous.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to