This is an automated email from the ASF dual-hosted git repository.
lordgamez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new e3728f117 MINIFICPP-2613 - Move base metrics to libminifi
e3728f117 is described below
commit e3728f117446267e086689faa1889fc0ff19ccfb
Author: Adam Debreceni <[email protected]>
AuthorDate: Wed Sep 3 07:47:56 2025 +0200
MINIFICPP-2613 - Move base metrics to libminifi
Signed-off-by: Gabor Gyimesi <[email protected]>
This closes #2020
---
core-framework/include/core/ProcessorImpl.h | 9 ++-
core-framework/src/core/ProcessorImpl.cpp | 2 -
.../llamacpp/processors/RunLlamaCppInference.cpp | 4 +-
.../llamacpp/processors/RunLlamaCppInference.h | 24 ++++----
extensions/mqtt/processors/PublishMQTT.cpp | 17 +++---
extensions/mqtt/processors/PublishMQTT.h | 6 +-
.../standard-processors/processors/GetFile.cpp | 2 +-
.../standard-processors/processors/GetFile.h | 23 +++-----
libminifi/include/core/ProcessSession.h | 4 +-
libminifi/include/core/Processor.h | 5 +-
.../include/core/ProcessorMetrics.h | 67 +++++++++++-----------
libminifi/include/core/state/nodes/MetricsBase.h | 6 ++
.../include/core/state/nodes/ResponseNode.h | 0
libminifi/src/EventDrivenSchedulingAgent.cpp | 4 +-
libminifi/src/core/Processor.cpp | 19 ++++--
.../src/core/ProcessorMetrics.cpp | 48 ++++++++++------
libminifi/test/libtest/unit/TestBase.cpp | 6 +-
libminifi/test/unit/MetricsTests.cpp | 8 +--
.../include/minifi-cpp/core/ProcessSession.h | 3 -
minifi-api/include/minifi-cpp/core/ProcessorApi.h | 8 +--
.../include/minifi-cpp/core/ProcessorMetrics.h | 57 ------------------
.../minifi-cpp/core/ProcessorMetricsExtension.h | 31 ++++++++++
.../minifi-cpp/core/state/nodes/MetricsBase.h | 6 --
23 files changed, 170 insertions(+), 189 deletions(-)
diff --git a/core-framework/include/core/ProcessorImpl.h
b/core-framework/include/core/ProcessorImpl.h
index 44fec377c..cf409012a 100644
--- a/core-framework/include/core/ProcessorImpl.h
+++ b/core-framework/include/core/ProcessorImpl.h
@@ -33,8 +33,7 @@
#include "minifi-cpp/core/Annotation.h"
#include "minifi-cpp/core/DynamicProperty.h"
#include "minifi-cpp/core/Scheduling.h"
-#include "minifi-cpp/core/state/nodes/MetricsBase.h"
-#include "minifi-cpp/core/ProcessorMetrics.h"
+#include "minifi-cpp/core/ProcessorMetricsExtension.h"
#include "minifi-cpp/utils/gsl.h"
#include "utils/Id.h"
#include "minifi-cpp/core/OutputAttributeDefinition.h"
@@ -118,8 +117,8 @@ class ProcessorImpl : public virtual ProcessorApi {
annotation::Input getInputRequirement() const override = 0;
- gsl::not_null<std::shared_ptr<ProcessorMetrics>> getMetrics() const override
{
- return metrics_;
+ std::shared_ptr<ProcessorMetricsExtension> getMetricsExtension() const
override {
+ return metrics_extension_;
}
static constexpr auto DynamicProperties = std::array<DynamicProperty, 0>{};
@@ -142,7 +141,7 @@ class ProcessorImpl : public virtual ProcessorApi {
std::atomic<bool> trigger_when_empty_;
- gsl::not_null<std::shared_ptr<ProcessorMetrics>> metrics_;
+ std::shared_ptr<ProcessorMetricsExtension> metrics_extension_;
std::shared_ptr<logging::Logger> logger_;
diff --git a/core-framework/src/core/ProcessorImpl.cpp
b/core-framework/src/core/ProcessorImpl.cpp
index 4c349b6a1..c45c0ff8f 100644
--- a/core-framework/src/core/ProcessorImpl.cpp
+++ b/core-framework/src/core/ProcessorImpl.cpp
@@ -36,7 +36,6 @@
#include "range/v3/algorithm/any_of.hpp"
#include "fmt/format.h"
#include "minifi-cpp/Exception.h"
-#include "core/ProcessorMetrics.h"
#include "minifi-cpp/core/ProcessorDescriptor.h"
using namespace std::literals::chrono_literals;
@@ -46,7 +45,6 @@ namespace org::apache::nifi::minifi::core {
ProcessorImpl::ProcessorImpl(ProcessorMetadata metadata)
: metadata_(std::move(metadata)),
trigger_when_empty_(false),
- metrics_(std::make_shared<ProcessorMetricsImpl>(*this)),
logger_(metadata_.logger) {
logger_->log_debug("Processor {} created with uuid {}", getName(),
getUUIDStr());
}
diff --git a/extensions/llamacpp/processors/RunLlamaCppInference.cpp
b/extensions/llamacpp/processors/RunLlamaCppInference.cpp
index 4b907e352..2524cfb8a 100644
--- a/extensions/llamacpp/processors/RunLlamaCppInference.cpp
+++ b/extensions/llamacpp/processors/RunLlamaCppInference.cpp
@@ -64,13 +64,13 @@ void RunLlamaCppInference::onSchedule(core::ProcessContext&
context, core::Proce
}
void RunLlamaCppInference::increaseTokensIn(uint64_t token_count) {
- auto* const llamacpp_metrics =
dynamic_cast<RunLlamaCppInferenceMetrics*>(metrics_.get());
+ auto* const llamacpp_metrics =
dynamic_cast<RunLlamaCppInferenceMetrics*>(metrics_extension_.get());
gsl_Assert(llamacpp_metrics);
llamacpp_metrics->tokens_in += token_count;
}
void RunLlamaCppInference::increaseTokensOut(uint64_t token_count) {
- auto* const llamacpp_metrics =
dynamic_cast<RunLlamaCppInferenceMetrics*>(metrics_.get());
+ auto* const llamacpp_metrics =
dynamic_cast<RunLlamaCppInferenceMetrics*>(metrics_extension_.get());
gsl_Assert(llamacpp_metrics);
llamacpp_metrics->tokens_out += token_count;
}
diff --git a/extensions/llamacpp/processors/RunLlamaCppInference.h
b/extensions/llamacpp/processors/RunLlamaCppInference.h
index c2d11fe45..b82e196ed 100644
--- a/extensions/llamacpp/processors/RunLlamaCppInference.h
+++ b/extensions/llamacpp/processors/RunLlamaCppInference.h
@@ -24,36 +24,34 @@
#include "core/logging/LoggerFactory.h"
#include "core/PropertyDefinitionBuilder.h"
#include "LlamaContext.h"
-#include "core/ProcessorMetrics.h"
+#include "minifi-cpp/core/ProcessorMetricsExtension.h"
+#include "core/state/Value.h"
namespace org::apache::nifi::minifi::extensions::llamacpp::processors {
using LlamaContextProvider =
std::function<std::unique_ptr<LlamaContext>(const std::filesystem::path&
model_path, const LlamaSamplerParams& llama_sampler_params, const
LlamaContextParams& llama_ctx_params)>;
-class RunLlamaCppInferenceMetrics : public core::ProcessorMetricsImpl {
+class RunLlamaCppInferenceMetrics : public core::ProcessorMetricsExtension {
public:
- explicit RunLlamaCppInferenceMetrics(const core::ProcessorImpl&
source_processor)
- : core::ProcessorMetricsImpl(source_processor) {
- }
+ RunLlamaCppInferenceMetrics() = default;
std::vector<state::response::SerializedResponseNode> serialize() override {
- auto resp = core::ProcessorMetricsImpl::serialize();
- auto& root_node = resp[0];
+ std::vector<state::response::SerializedResponseNode> resp;
state::response::SerializedResponseNode tokens_in_node{"TokensIn",
tokens_in.load()};
- root_node.children.push_back(tokens_in_node);
+ resp.push_back(tokens_in_node);
state::response::SerializedResponseNode tokens_out_node{"TokensOut",
tokens_out.load()};
- root_node.children.push_back(tokens_out_node);
+ resp.push_back(tokens_out_node);
return resp;
}
std::vector<state::PublishedMetric> calculateMetrics() override {
- auto metrics = core::ProcessorMetricsImpl::calculateMetrics();
- metrics.push_back({"tokens_in", static_cast<double>(tokens_in.load()),
getCommonLabels()});
- metrics.push_back({"tokens_out", static_cast<double>(tokens_out.load()),
getCommonLabels()});
+ std::vector<state::PublishedMetric> metrics;
+ metrics.push_back({"tokens_in", static_cast<double>(tokens_in.load()),
{}});
+ metrics.push_back({"tokens_out", static_cast<double>(tokens_out.load()),
{}});
return metrics;
}
@@ -66,7 +64,7 @@ class RunLlamaCppInference : public core::ProcessorImpl {
explicit RunLlamaCppInference(core::ProcessorMetadata metadata,
LlamaContextProvider llama_context_provider = {})
: core::ProcessorImpl(metadata),
llama_context_provider_(std::move(llama_context_provider)) {
- metrics_ =
gsl::make_not_null(std::make_shared<RunLlamaCppInferenceMetrics>(*this));
+ metrics_extension_ =
gsl::make_not_null(std::make_shared<RunLlamaCppInferenceMetrics>());
}
~RunLlamaCppInference() override = default;
diff --git a/extensions/mqtt/processors/PublishMQTT.cpp
b/extensions/mqtt/processors/PublishMQTT.cpp
index 1952dc057..9b8fe786d 100644
--- a/extensions/mqtt/processors/PublishMQTT.cpp
+++ b/extensions/mqtt/processors/PublishMQTT.cpp
@@ -326,25 +326,22 @@ uint16_t
PublishMQTT::InFlightMessageCounter::getCounter() const {
return counter_;
}
-PublishMQTT::PublishMQTTMetrics::PublishMQTTMetrics(const core::ProcessorImpl&
source_processor, const InFlightMessageCounter& in_flight_message_counter)
- : core::ProcessorMetricsImpl(source_processor),
- in_flight_message_counter_(&in_flight_message_counter) {
+PublishMQTT::PublishMQTTMetrics::PublishMQTTMetrics(const
InFlightMessageCounter& in_flight_message_counter)
+ : in_flight_message_counter_(&in_flight_message_counter) {
}
std::vector<state::response::SerializedResponseNode>
PublishMQTT::PublishMQTTMetrics::serialize() {
- auto metrics_vector = core::ProcessorMetricsImpl::serialize();
- gsl_Expects(!metrics_vector.empty());
- auto& metrics = metrics_vector[0];
+ std::vector<state::response::SerializedResponseNode> metrics;
state::response::SerializedResponseNode
in_flight_message_count_node{"InFlightMessageCount",
static_cast<uint32_t>(in_flight_message_counter_->getCounter())};
- metrics.children.push_back(in_flight_message_count_node);
+ metrics.push_back(in_flight_message_count_node);
- return metrics_vector;
+ return metrics;
}
std::vector<state::PublishedMetric>
PublishMQTT::PublishMQTTMetrics::calculateMetrics() {
- auto metrics = core::ProcessorMetricsImpl::calculateMetrics();
- metrics.push_back({"in_flight_message_count",
static_cast<double>(in_flight_message_counter_->getCounter()),
getCommonLabels()});
+ std::vector<state::PublishedMetric> metrics;
+ metrics.push_back({"in_flight_message_count",
static_cast<double>(in_flight_message_counter_->getCounter()), {}});
return metrics;
}
diff --git a/extensions/mqtt/processors/PublishMQTT.h
b/extensions/mqtt/processors/PublishMQTT.h
index 4e0145486..24fa72e16 100644
--- a/extensions/mqtt/processors/PublishMQTT.h
+++ b/extensions/mqtt/processors/PublishMQTT.h
@@ -41,7 +41,7 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
public:
explicit PublishMQTT(core::ProcessorMetadata metadata)
: processors::AbstractMQTTProcessor(metadata) {
- metrics_ = gsl::make_not_null(std::make_shared<PublishMQTTMetrics>(*this,
in_flight_message_counter_));
+ metrics_extension_ =
gsl::make_not_null(std::make_shared<PublishMQTTMetrics>(in_flight_message_counter_));
}
EXTENSIONAPI static constexpr const char* Description = "PublishMQTT
serializes FlowFile content as an MQTT payload, sending the message to the
configured topic and broker.";
@@ -111,9 +111,9 @@ class PublishMQTT : public
processors::AbstractMQTTProcessor {
uint16_t limit_{MQTT_MAX_RECEIVE_MAXIMUM};
};
- class PublishMQTTMetrics : public core::ProcessorMetricsImpl {
+ class PublishMQTTMetrics : public core::ProcessorMetricsExtension {
public:
- PublishMQTTMetrics(const core::ProcessorImpl& source_processor, const
InFlightMessageCounter& in_flight_message_counter);
+ explicit PublishMQTTMetrics(const InFlightMessageCounter&
in_flight_message_counter);
std::vector<state::response::SerializedResponseNode> serialize() override;
std::vector<state::PublishedMetric> calculateMetrics() override;
diff --git a/extensions/standard-processors/processors/GetFile.cpp
b/extensions/standard-processors/processors/GetFile.cpp
index f325b4cc3..f0abaeaa5 100644
--- a/extensions/standard-processors/processors/GetFile.cpp
+++ b/extensions/standard-processors/processors/GetFile.cpp
@@ -178,7 +178,7 @@ bool GetFile::fileMatchesRequestCriteria(const
std::filesystem::path& full_name,
return false;
}
- auto* const getfile_metrics = dynamic_cast<GetFileMetrics*>(metrics_.get());
+ auto* const getfile_metrics =
dynamic_cast<GetFileMetrics*>(metrics_extension_.get());
gsl_Assert(getfile_metrics);
getfile_metrics->input_bytes += file_size;
++getfile_metrics->accepted_files;
diff --git a/extensions/standard-processors/processors/GetFile.h
b/extensions/standard-processors/processors/GetFile.h
index f58518c03..d43e06d93 100644
--- a/extensions/standard-processors/processors/GetFile.h
+++ b/extensions/standard-processors/processors/GetFile.h
@@ -34,7 +34,7 @@
#include "core/Core.h"
#include "core/logging/LoggerFactory.h"
#include "minifi-cpp/utils/Export.h"
-#include "core/ProcessorMetrics.h"
+#include "minifi-cpp/core/ProcessorMetricsExtension.h"
namespace org::apache::nifi::minifi::processors {
@@ -52,29 +52,24 @@ struct GetFileRequest {
std::filesystem::path inputDirectory;
};
-class GetFileMetrics : public core::ProcessorMetricsImpl {
+class GetFileMetrics : public core::ProcessorMetricsExtension {
public:
- explicit GetFileMetrics(const core::ProcessorImpl& source_processor)
- : core::ProcessorMetricsImpl(source_processor) {
- }
-
std::vector<state::response::SerializedResponseNode> serialize() override {
- auto resp = core::ProcessorMetricsImpl::serialize();
- auto& root_node = resp[0];
+ std::vector<state::response::SerializedResponseNode> resp;
state::response::SerializedResponseNode
accepted_files_node{"AcceptedFiles", accepted_files.load()};
- root_node.children.push_back(accepted_files_node);
+ resp.push_back(accepted_files_node);
state::response::SerializedResponseNode input_bytes_node{"InputBytes",
input_bytes.load()};
- root_node.children.push_back(input_bytes_node);
+ resp.push_back(input_bytes_node);
return resp;
}
std::vector<state::PublishedMetric> calculateMetrics() override {
- auto metrics = core::ProcessorMetricsImpl::calculateMetrics();
- metrics.push_back({"accepted_files",
static_cast<double>(accepted_files.load()), getCommonLabels()});
- metrics.push_back({"input_bytes", static_cast<double>(input_bytes.load()),
getCommonLabels()});
+ std::vector<state::PublishedMetric> metrics;
+ metrics.push_back({"accepted_files",
static_cast<double>(accepted_files.load()), {}});
+ metrics.push_back({"input_bytes", static_cast<double>(input_bytes.load()),
{}});
return metrics;
}
@@ -86,7 +81,7 @@ class GetFile : public core::ProcessorImpl {
public:
explicit GetFile(core::ProcessorMetadata metadata)
: ProcessorImpl(metadata) {
- metrics_ = gsl::make_not_null(std::make_shared<GetFileMetrics>(*this));
+ metrics_extension_ = std::make_shared<GetFileMetrics>();
}
~GetFile() override = default;
diff --git a/libminifi/include/core/ProcessSession.h
b/libminifi/include/core/ProcessSession.h
index 8116466c0..d176bb3b2 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -38,7 +38,7 @@
#include "provenance/Provenance.h"
#include "core/Relationship.h"
#include "minifi-cpp/utils/gsl.h"
-#include "minifi-cpp/core/ProcessorMetrics.h"
+#include "core/ProcessorMetrics.h"
#include "minifi-cpp/core/ProcessSession.h"
namespace org::apache::nifi::minifi::core::detail {
@@ -123,7 +123,7 @@ class ProcessSessionImpl : public ReferenceContainerImpl,
public virtual Process
bool existsFlowFileInRelationship(const Relationship &relationship) override;
- void setMetrics(const std::shared_ptr<ProcessorMetrics>& metrics) override {
+ void setMetrics(const std::shared_ptr<ProcessorMetrics>& metrics) {
metrics_ = metrics;
}
diff --git a/libminifi/include/core/Processor.h
b/libminifi/include/core/Processor.h
index 9dc850859..7069854a5 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -38,7 +38,7 @@
#include "minifi-cpp/core/DynamicProperty.h"
#include "minifi-cpp/core/Scheduling.h"
#include "minifi-cpp/core/state/nodes/MetricsBase.h"
-#include "minifi-cpp/core/ProcessorMetrics.h"
+#include "core/ProcessorMetrics.h"
#include "minifi-cpp/utils/gsl.h"
#include "utils/Id.h"
#include "minifi-cpp/core/OutputAttributeDefinition.h"
@@ -113,6 +113,7 @@ class Processor : public ConnectableImpl, public
ConfigurableComponentImpl, publ
[[nodiscard]] bool supportsDynamicRelationships() const override;
state::response::SharedResponseNode getResponseNode() override;
gsl::not_null<std::shared_ptr<ProcessorMetrics>> getMetrics() const;
+ std::shared_ptr<ProcessorMetricsExtension> getMetricsExtension() const;
std::string getProcessGroupName() const;
void setProcessGroupName(const std::string &name);
std::string getProcessGroupPath() const;
@@ -170,6 +171,8 @@ class Processor : public ConnectableImpl, public
ConfigurableComponentImpl, publ
std::string process_group_name_;
std::string process_group_path_;
+ gsl::not_null<std::shared_ptr<ProcessorMetrics>> metrics_;
+
protected:
std::unique_ptr<ProcessorApi> impl_;
};
diff --git a/core-framework/include/core/ProcessorMetrics.h
b/libminifi/include/core/ProcessorMetrics.h
similarity index 59%
rename from core-framework/include/core/ProcessorMetrics.h
rename to libminifi/include/core/ProcessorMetrics.h
index 40e0e6e0a..2a0edaf9b 100644
--- a/core-framework/include/core/ProcessorMetrics.h
+++ b/libminifi/include/core/ProcessorMetrics.h
@@ -23,9 +23,8 @@
#include <mutex>
#include <vector>
-#include "core/state/nodes/ResponseNode.h"
-#include "minifi-cpp/core/ProcessorMetrics.h"
#include "core/state/Value.h"
+#include "state/nodes/MetricsBase.h"
namespace org::apache::nifi::minifi::core {
@@ -35,42 +34,42 @@ concept Summable = requires(T x) { x + x; }; //
NOLINT(readability/braces)
template<typename T>
concept DividableByInteger = requires(T x, uint32_t divisor) { x / divisor; };
// NOLINT(readability/braces)
-class ProcessorImpl;
+class Processor;
-class ProcessorMetricsImpl : public state::response::ResponseNodeImpl, public
virtual ProcessorMetrics {
+class ProcessorMetrics : public state::response::ResponseNodeImpl {
public:
- explicit ProcessorMetricsImpl(const ProcessorImpl& source_processor);
+ explicit ProcessorMetrics(const Processor& source_processor);
[[nodiscard]] std::string getName() const override;
- std::vector<state::response::SerializedResponseNode> serialize() override;
- std::vector<state::PublishedMetric> calculateMetrics() override;
- void increaseRelationshipTransferCount(const std::string& relationship,
size_t count = 1) override;
- std::chrono::milliseconds getAverageOnTriggerRuntime() const override;
- std::chrono::milliseconds getLastOnTriggerRuntime() const override;
- void addLastOnTriggerRuntime(std::chrono::milliseconds runtime) override;
-
- std::chrono::milliseconds getAverageSessionCommitRuntime() const override;
- std::chrono::milliseconds getLastSessionCommitRuntime() const override;
- void addLastSessionCommitRuntime(std::chrono::milliseconds runtime) override;
- std::optional<size_t> getTransferredFlowFilesToRelationshipCount(const
std::string& relationship) const override;
-
- std::atomic<size_t>& invocations() override {return invocations_;}
- const std::atomic<size_t>& invocations() const override {return
invocations_;}
- std::atomic<size_t>& incomingFlowFiles() override {return
incoming_flow_files_;}
- const std::atomic<size_t>& incomingFlowFiles() const override {return
incoming_flow_files_;}
- std::atomic<size_t>& transferredFlowFiles() override {return
transferred_flow_files_;}
- const std::atomic<size_t>& transferredFlowFiles() const override {return
transferred_flow_files_;}
- std::atomic<uint64_t>& incomingBytes() override {return incoming_bytes_;}
- const std::atomic<uint64_t>& incomingBytes() const override {return
incoming_bytes_;}
- std::atomic<uint64_t>& transferredBytes() override {return
transferred_bytes_;}
- const std::atomic<uint64_t>& transferredBytes() const override {return
transferred_bytes_;}
- std::atomic<uint64_t>& bytesRead() override {return bytes_read_;}
- const std::atomic<uint64_t>& bytesRead() const override {return bytes_read_;}
- std::atomic<uint64_t>& bytesWritten() override {return bytes_written_;}
- const std::atomic<uint64_t>& bytesWritten() const override {return
bytes_written_;}
- std::atomic<uint64_t>& processingNanos() override {return processing_nanos_;}
- const std::atomic<uint64_t>& processingNanos() const override {return
processing_nanos_;}
+ std::vector<state::response::SerializedResponseNode> serialize() final;
+ std::vector<state::PublishedMetric> calculateMetrics() final;
+ void increaseRelationshipTransferCount(const std::string& relationship,
size_t count = 1);
+ std::chrono::milliseconds getAverageOnTriggerRuntime() const;
+ std::chrono::milliseconds getLastOnTriggerRuntime() const;
+ void addLastOnTriggerRuntime(std::chrono::milliseconds runtime);
+
+ std::chrono::milliseconds getAverageSessionCommitRuntime() const;
+ std::chrono::milliseconds getLastSessionCommitRuntime() const;
+ void addLastSessionCommitRuntime(std::chrono::milliseconds runtime);
+ std::optional<size_t> getTransferredFlowFilesToRelationshipCount(const
std::string& relationship) const;
+
+ std::atomic<size_t>& invocations() {return invocations_;}
+ const std::atomic<size_t>& invocations() const {return invocations_;}
+ std::atomic<size_t>& incomingFlowFiles() {return incoming_flow_files_;}
+ const std::atomic<size_t>& incomingFlowFiles() const {return
incoming_flow_files_;}
+ std::atomic<size_t>& transferredFlowFiles() {return transferred_flow_files_;}
+ const std::atomic<size_t>& transferredFlowFiles() const {return
transferred_flow_files_;}
+ std::atomic<uint64_t>& incomingBytes() {return incoming_bytes_;}
+ const std::atomic<uint64_t>& incomingBytes() const {return incoming_bytes_;}
+ std::atomic<uint64_t>& transferredBytes() {return transferred_bytes_;}
+ const std::atomic<uint64_t>& transferredBytes() const {return
transferred_bytes_;}
+ std::atomic<uint64_t>& bytesRead() {return bytes_read_;}
+ const std::atomic<uint64_t>& bytesRead() const {return bytes_read_;}
+ std::atomic<uint64_t>& bytesWritten() {return bytes_written_;}
+ const std::atomic<uint64_t>& bytesWritten() const {return bytes_written_;}
+ std::atomic<uint64_t>& processingNanos() {return processing_nanos_;}
+ const std::atomic<uint64_t>& processingNanos() const {return
processing_nanos_;}
protected:
template<typename ValueType>
@@ -97,7 +96,7 @@ class ProcessorMetricsImpl : public
state::response::ResponseNodeImpl, public vi
mutable std::mutex transferred_relationships_mutex_;
std::unordered_map<std::string, size_t> transferred_relationships_;
- const ProcessorImpl& source_processor_;
+ const Processor& source_processor_;
Averager<std::chrono::milliseconds> on_trigger_runtime_averager_;
Averager<std::chrono::milliseconds> session_commit_runtime_averager_;
diff --git a/libminifi/include/core/state/nodes/MetricsBase.h
b/libminifi/include/core/state/nodes/MetricsBase.h
index 510a457ef..004ab3fe5 100644
--- a/libminifi/include/core/state/nodes/MetricsBase.h
+++ b/libminifi/include/core/state/nodes/MetricsBase.h
@@ -30,6 +30,12 @@
namespace org::apache::nifi::minifi::state::response {
+class ResponseNodeSource {
+ public:
+ virtual ~ResponseNodeSource() = default;
+ virtual SharedResponseNode getResponseNode() = 0;
+};
+
class DeviceInformation : public ResponseNodeImpl {
public:
DeviceInformation(std::string_view name, const utils::Identifier& uuid)
diff --git a/core-framework/include/core/state/nodes/ResponseNode.h
b/libminifi/include/core/state/nodes/ResponseNode.h
similarity index 100%
rename from core-framework/include/core/state/nodes/ResponseNode.h
rename to libminifi/include/core/state/nodes/ResponseNode.h
diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp
b/libminifi/src/EventDrivenSchedulingAgent.cpp
index 3967bddfb..f815898cd 100644
--- a/libminifi/src/EventDrivenSchedulingAgent.cpp
+++ b/libminifi/src/EventDrivenSchedulingAgent.cpp
@@ -23,6 +23,7 @@
#include "minifi-cpp/core/ProcessContext.h"
#include "minifi-cpp/core/ProcessSessionFactory.h"
#include "minifi-cpp/core/Property.h"
+#include "core/ProcessSession.h"
using namespace std::literals::chrono_literals;
@@ -48,7 +49,8 @@ utils::TaskRescheduleInfo
EventDrivenSchedulingAgent::run(core::Processor* proce
const auto start_time = std::chrono::steady_clock::now();
// trigger processor while it has work to do, but no more than the
configured nifi.flow.engine.event.driven.time.slice
- const auto process_session = session_factory->createSession();
+ const auto process_session =
std::dynamic_pointer_cast<core::ProcessSessionImpl>(session_factory->createSession());
+ gsl_Assert(process_session);
process_session->setMetrics(processor->getMetrics());
bool needs_commit = true;
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index 622650ab7..5c0da6f2a 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -35,10 +35,10 @@
#include "minifi-cpp/core/ProcessorDescriptor.h"
#include "minifi-cpp/core/ProcessSessionFactory.h"
#include "minifi-cpp/utils/gsl.h"
+#include "core/ProcessSession.h"
#include "range/v3/algorithm/any_of.hpp"
#include "fmt/format.h"
#include "minifi-cpp/Exception.h"
-#include "minifi-cpp/core/ProcessorMetrics.h"
using namespace std::literals::chrono_literals;
@@ -59,6 +59,7 @@ Processor::Processor(std::string_view name,
std::unique_ptr<ProcessorApi> impl)
yield_period_(DEFAULT_YIELD_PERIOD_SECONDS),
active_tasks_(0),
logger_(logging::LoggerFactory<Processor>::getLogger(uuid_)),
+ metrics_(gsl::make_not_null(std::make_shared<ProcessorMetrics>(*this))),
impl_(std::move(impl)) {
has_work_.store(false);
// Setup the default values
@@ -77,6 +78,7 @@ Processor::Processor(std::string_view name, const
utils::Identifier& uuid, std::
yield_period_(DEFAULT_YIELD_PERIOD_SECONDS),
active_tasks_(0),
logger_(logging::LoggerFactory<Processor>::getLogger(uuid_)),
+ metrics_(gsl::make_not_null(std::make_shared<ProcessorMetrics>(*this))),
impl_(std::move(impl)) {
has_work_.store(false);
// Setup the default values
@@ -173,7 +175,8 @@ bool Processor::addConnection(Connectable* conn) {
}
void Processor::triggerAndCommit(const std::shared_ptr<ProcessContext>&
context, const std::shared_ptr<ProcessSessionFactory>& session_factory) {
- const auto process_session = session_factory->createSession();
+ const auto process_session =
std::dynamic_pointer_cast<core::ProcessSessionImpl>(session_factory->createSession());
+ gsl_Assert(process_session);
process_session->setMetrics(getMetrics());
try {
trigger(context, process_session);
@@ -191,10 +194,10 @@ void Processor::triggerAndCommit(const
std::shared_ptr<ProcessContext>& context,
}
void Processor::trigger(const std::shared_ptr<ProcessContext>& context, const
std::shared_ptr<ProcessSession>& process_session) {
- ++impl_->getMetrics()->invocations();
+ ++metrics_->invocations();
const auto start = std::chrono::steady_clock::now();
onTrigger(*context, *process_session);
-
impl_->getMetrics()->addLastOnTriggerRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
- start));
+
metrics_->addLastOnTriggerRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
- start));
}
bool Processor::isWorkAvailable() {
@@ -531,11 +534,15 @@ annotation::Input Processor::getInputRequirement() const {
}
state::response::SharedResponseNode Processor::getResponseNode() {
- return getMetrics();
+ return metrics_;
}
gsl::not_null<std::shared_ptr<ProcessorMetrics>> Processor::getMetrics() const
{
- return impl_->getMetrics();
+ return metrics_;
+}
+
+std::shared_ptr<ProcessorMetricsExtension> Processor::getMetricsExtension()
const {
+ return impl_->getMetricsExtension();
}
void Processor::restore(const std::shared_ptr<FlowFile>& file) {
diff --git a/core-framework/src/core/ProcessorMetrics.cpp
b/libminifi/src/core/ProcessorMetrics.cpp
similarity index 79%
rename from core-framework/src/core/ProcessorMetrics.cpp
rename to libminifi/src/core/ProcessorMetrics.cpp
index 5de69f200..b97ac71af 100644
--- a/core-framework/src/core/ProcessorMetrics.cpp
+++ b/libminifi/src/core/ProcessorMetrics.cpp
@@ -16,7 +16,7 @@
*/
#include "core/ProcessorMetrics.h"
-#include "core/ProcessorImpl.h"
+#include "core/Processor.h"
#include "core/state/Value.h"
#include "minifi-cpp/utils/gsl.h"
#include "range/v3/numeric/accumulate.hpp"
@@ -25,21 +25,21 @@ using namespace std::literals::chrono_literals;
namespace org::apache::nifi::minifi::core {
-ProcessorMetricsImpl::ProcessorMetricsImpl(const ProcessorImpl&
source_processor)
+ProcessorMetrics::ProcessorMetrics(const Processor& source_processor)
: source_processor_(source_processor),
on_trigger_runtime_averager_(STORED_ON_TRIGGER_RUNTIME_COUNT),
session_commit_runtime_averager_(STORED_ON_TRIGGER_RUNTIME_COUNT) {
}
-std::string ProcessorMetricsImpl::getName() const {
+std::string ProcessorMetrics::getName() const {
return source_processor_.getProcessorType() + "Metrics";
}
-std::unordered_map<std::string, std::string>
ProcessorMetricsImpl::getCommonLabels() const {
+std::unordered_map<std::string, std::string>
ProcessorMetrics::getCommonLabels() const {
return {{"metric_class", getName()}, {"processor_name",
source_processor_.getName()}, {"processor_uuid",
source_processor_.getUUIDStr()}};
}
-std::vector<state::response::SerializedResponseNode>
ProcessorMetricsImpl::serialize() {
+std::vector<state::response::SerializedResponseNode>
ProcessorMetrics::serialize() {
std::vector<state::response::SerializedResponseNode> resp;
state::response::SerializedResponseNode root_node {
@@ -74,10 +74,16 @@ std::vector<state::response::SerializedResponseNode>
ProcessorMetricsImpl::seria
resp.push_back(root_node);
+ if (auto metrics_extension = source_processor_.getMetricsExtension()) {
+ for (auto&& extension_child : metrics_extension->serialize()) {
+ resp[0].children.push_back(std::move(extension_child));
+ }
+ }
+
return resp;
}
-std::vector<state::PublishedMetric> ProcessorMetricsImpl::calculateMetrics() {
+std::vector<state::PublishedMetric> ProcessorMetrics::calculateMetrics() {
std::vector<state::PublishedMetric> metrics = {
{"onTrigger_invocations", static_cast<double>(invocations()),
getCommonLabels()},
{"average_onTrigger_runtime_milliseconds",
static_cast<double>(getAverageOnTriggerRuntime().count()), getCommonLabels()},
@@ -101,39 +107,47 @@ std::vector<state::PublishedMetric>
ProcessorMetricsImpl::calculateMetrics() {
}
}
+ const auto common_labels = getCommonLabels();
+ if (auto metrics_extension = source_processor_.getMetricsExtension()) {
+ for (auto&& extension_node : metrics_extension->calculateMetrics()) {
+ extension_node.labels.insert(common_labels.begin(), common_labels.end());
+ metrics.push_back(std::move(extension_node));
+ }
+ }
+
return metrics;
}
-void ProcessorMetricsImpl::increaseRelationshipTransferCount(const
std::string& relationship, size_t count) {
+void ProcessorMetrics::increaseRelationshipTransferCount(const std::string&
relationship, size_t count) {
std::lock_guard<std::mutex> lock(transferred_relationships_mutex_);
transferred_relationships_[relationship] += count;
}
-std::chrono::milliseconds ProcessorMetricsImpl::getAverageOnTriggerRuntime()
const {
+std::chrono::milliseconds ProcessorMetrics::getAverageOnTriggerRuntime() const
{
return on_trigger_runtime_averager_.getAverage();
}
-void ProcessorMetricsImpl::addLastOnTriggerRuntime(std::chrono::milliseconds
runtime) {
+void ProcessorMetrics::addLastOnTriggerRuntime(std::chrono::milliseconds
runtime) {
on_trigger_runtime_averager_.addValue(runtime);
}
-std::chrono::milliseconds ProcessorMetricsImpl::getLastOnTriggerRuntime()
const {
+std::chrono::milliseconds ProcessorMetrics::getLastOnTriggerRuntime() const {
return on_trigger_runtime_averager_.getLastValue();
}
-std::chrono::milliseconds
ProcessorMetricsImpl::getAverageSessionCommitRuntime() const {
+std::chrono::milliseconds ProcessorMetrics::getAverageSessionCommitRuntime()
const {
return session_commit_runtime_averager_.getAverage();
}
-void
ProcessorMetricsImpl::addLastSessionCommitRuntime(std::chrono::milliseconds
runtime) {
+void ProcessorMetrics::addLastSessionCommitRuntime(std::chrono::milliseconds
runtime) {
session_commit_runtime_averager_.addValue(runtime);
}
-std::chrono::milliseconds ProcessorMetricsImpl::getLastSessionCommitRuntime()
const {
+std::chrono::milliseconds ProcessorMetrics::getLastSessionCommitRuntime()
const {
return session_commit_runtime_averager_.getLastValue();
}
-std::optional<size_t>
ProcessorMetricsImpl::getTransferredFlowFilesToRelationshipCount(const
std::string& relationship) const {
+std::optional<size_t>
ProcessorMetrics::getTransferredFlowFilesToRelationshipCount(const std::string&
relationship) const {
std::lock_guard<std::mutex> lock(transferred_relationships_mutex_);
const auto relationship_it = transferred_relationships_.find(relationship);
if (relationship_it != std::end(transferred_relationships_)) {
@@ -144,7 +158,7 @@ std::optional<size_t>
ProcessorMetricsImpl::getTransferredFlowFilesToRelationshi
template<typename ValueType>
requires Summable<ValueType> && DividableByInteger<ValueType>
-ValueType ProcessorMetricsImpl::Averager<ValueType>::getAverage() const {
+ValueType ProcessorMetrics::Averager<ValueType>::getAverage() const {
std::lock_guard<std::mutex> lock(average_value_mutex_);
if (values_.empty()) {
return {};
@@ -154,7 +168,7 @@ ValueType
ProcessorMetricsImpl::Averager<ValueType>::getAverage() const {
template<typename ValueType>
requires Summable<ValueType> && DividableByInteger<ValueType>
-void ProcessorMetricsImpl::Averager<ValueType>::addValue(ValueType runtime) {
+void ProcessorMetrics::Averager<ValueType>::addValue(ValueType runtime) {
std::lock_guard<std::mutex> lock(average_value_mutex_);
if (values_.size() < SAMPLE_SIZE_) {
values_.push_back(runtime);
@@ -169,7 +183,7 @@ void
ProcessorMetricsImpl::Averager<ValueType>::addValue(ValueType runtime) {
template<typename ValueType>
requires Summable<ValueType> && DividableByInteger<ValueType>
-ValueType ProcessorMetricsImpl::Averager<ValueType>::getLastValue() const {
+ValueType ProcessorMetrics::Averager<ValueType>::getLastValue() const {
std::lock_guard<std::mutex> lock(average_value_mutex_);
if (values_.empty()) {
return {};
diff --git a/libminifi/test/libtest/unit/TestBase.cpp
b/libminifi/test/libtest/unit/TestBase.cpp
index 135805d50..1fbc35cc8 100644
--- a/libminifi/test/libtest/unit/TestBase.cpp
+++ b/libminifi/test/libtest/unit/TestBase.cpp
@@ -548,8 +548,10 @@ bool TestPlan::runProcessor(size_t target_location, const
PreTriggerVerifier& ve
verify(context, current_session);
current_session->commit();
} else {
- const auto session_factory = std::make_shared<TestSessionFactory>(context,
[&] (const auto& current_session) {
- current_session->setMetrics(processor->getMetrics());
+ auto session_factory = std::make_shared<TestSessionFactory>(context, [&]
(const auto& current_session) {
+ auto session =
std::dynamic_pointer_cast<core::ProcessSessionImpl>(current_session);
+ gsl_Assert(session);
+ session->setMetrics(processor->getMetrics());
process_sessions_.push_back(current_session);
});
logger_->log_info("Running {}", processor->getName());
diff --git a/libminifi/test/unit/MetricsTests.cpp
b/libminifi/test/unit/MetricsTests.cpp
index d9c3b1b9d..e118fd12f 100644
--- a/libminifi/test/unit/MetricsTests.cpp
+++ b/libminifi/test/unit/MetricsTests.cpp
@@ -212,8 +212,8 @@ TEST_CASE("VolatileRepositorymetricsCanBeFull", "[c2m4]") {
}
TEST_CASE("Test on trigger runtime processor metrics", "[ProcessorMetrics]") {
- DummyProcessor dummy_processor("dummy");
- minifi::core::ProcessorMetricsImpl metrics(dummy_processor);
+ auto dummy_processor =
minifi::test::utils::make_processor<DummyProcessor>("dummy");
+ minifi::core::ProcessorMetrics metrics(*dummy_processor);
REQUIRE("DummyProcessorMetrics" == metrics.getName());
@@ -251,8 +251,8 @@ TEST_CASE("Test on trigger runtime processor metrics",
"[ProcessorMetrics]") {
}
TEST_CASE("Test commit runtime processor metrics", "[ProcessorMetrics]") {
- DummyProcessor dummy_processor("dummy");
- minifi::core::ProcessorMetricsImpl metrics(dummy_processor);
+ auto dummy_processor =
minifi::test::utils::make_processor<DummyProcessor>("dummy");
+ minifi::core::ProcessorMetrics metrics(*dummy_processor);
REQUIRE("DummyProcessorMetrics" == metrics.getName());
diff --git a/minifi-api/include/minifi-cpp/core/ProcessSession.h
b/minifi-api/include/minifi-cpp/core/ProcessSession.h
index caf21779e..052d1eaf2 100644
--- a/minifi-api/include/minifi-cpp/core/ProcessSession.h
+++ b/minifi-api/include/minifi-cpp/core/ProcessSession.h
@@ -23,7 +23,6 @@
#include "FlowFile.h"
#include "minifi-cpp/provenance/Provenance.h"
-#include "ProcessorMetrics.h"
#include "minifi-cpp/io/StreamCallback.h"
namespace org::apache::nifi::minifi::core {
@@ -117,8 +116,6 @@ class ProcessSession : public virtual ReferenceContainer {
virtual bool existsFlowFileInRelationship(const Relationship &relationship)
= 0;
- virtual void setMetrics(const std::shared_ptr<ProcessorMetrics>& metrics) =
0;
-
virtual bool hasBeenTransferred(const core::FlowFile &flow) const = 0;
};
diff --git a/minifi-api/include/minifi-cpp/core/ProcessorApi.h
b/minifi-api/include/minifi-cpp/core/ProcessorApi.h
index 9fff5bf7b..ae39feae5 100644
--- a/minifi-api/include/minifi-cpp/core/ProcessorApi.h
+++ b/minifi-api/include/minifi-cpp/core/ProcessorApi.h
@@ -16,12 +16,8 @@
*/
#pragma once
-#include <chrono>
#include <memory>
-#include <mutex>
#include <string>
-#include <unordered_set>
-#include <unordered_map>
#include "ConfigurableComponent.h"
#include "Property.h"
@@ -30,9 +26,9 @@
#include "minifi-cpp/core/Annotation.h"
#include "Scheduling.h"
#include "minifi-cpp/core/state/nodes/MetricsBase.h"
-#include "ProcessorMetrics.h"
#include "minifi-cpp/utils/gsl.h"
#include "minifi-cpp/core/logging/Logger.h"
+#include "minifi-cpp/core/ProcessorMetricsExtension.h"
namespace org::apache::nifi::minifi {
@@ -66,7 +62,7 @@ class ProcessorApi {
virtual void onUnSchedule() = 0;
virtual void notifyStop() = 0;
virtual annotation::Input getInputRequirement() const = 0;
- virtual gsl::not_null<std::shared_ptr<ProcessorMetrics>> getMetrics() const
= 0;
+ virtual std::shared_ptr<ProcessorMetricsExtension> getMetricsExtension()
const = 0;
virtual void setLoggerCallback(const std::function<void(logging::LOG_LEVEL
level, const std::string& message)>& callback) = 0;
};
diff --git a/minifi-api/include/minifi-cpp/core/ProcessorMetrics.h
b/minifi-api/include/minifi-cpp/core/ProcessorMetrics.h
deleted file mode 100644
index 8d19c1cd1..000000000
--- a/minifi-api/include/minifi-cpp/core/ProcessorMetrics.h
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include <string>
-#include <chrono>
-#include <atomic>
-
-#include "minifi-cpp/core/state/nodes/MetricsBase.h"
-
-namespace org::apache::nifi::minifi::core {
-
-class ProcessorMetrics : public virtual state::response::ResponseNode {
- public:
- virtual void increaseRelationshipTransferCount(const std::string&
relationship, size_t count = 1) = 0;
- virtual std::chrono::milliseconds getAverageOnTriggerRuntime() const = 0;
- virtual std::chrono::milliseconds getLastOnTriggerRuntime() const = 0;
- virtual void addLastOnTriggerRuntime(std::chrono::milliseconds runtime) = 0;
-
- virtual std::chrono::milliseconds getAverageSessionCommitRuntime() const = 0;
- virtual std::chrono::milliseconds getLastSessionCommitRuntime() const = 0;
- virtual void addLastSessionCommitRuntime(std::chrono::milliseconds runtime)
= 0;
- virtual std::optional<size_t>
getTransferredFlowFilesToRelationshipCount(const std::string& relationship)
const = 0;
-
- virtual std::atomic<size_t>& invocations() = 0;
- virtual const std::atomic<size_t>& invocations() const = 0;
- virtual std::atomic<size_t>& incomingFlowFiles() = 0;
- virtual const std::atomic<size_t>& incomingFlowFiles() const = 0;
- virtual std::atomic<size_t>& transferredFlowFiles() = 0;
- virtual const std::atomic<size_t>& transferredFlowFiles() const = 0;
- virtual std::atomic<uint64_t>& incomingBytes() = 0;
- virtual const std::atomic<uint64_t>& incomingBytes() const = 0;
- virtual std::atomic<uint64_t>& transferredBytes() = 0;
- virtual const std::atomic<uint64_t>& transferredBytes() const = 0;
- virtual std::atomic<uint64_t>& bytesRead() = 0;
- virtual const std::atomic<uint64_t>& bytesRead() const = 0;
- virtual std::atomic<uint64_t>& bytesWritten() = 0;
- virtual const std::atomic<uint64_t>& bytesWritten() const = 0;
- virtual std::atomic<uint64_t>& processingNanos() = 0;
- virtual const std::atomic<uint64_t>& processingNanos() const = 0;
-};
-
-} // namespace org::apache::nifi::minifi::core
diff --git a/minifi-api/include/minifi-cpp/core/ProcessorMetricsExtension.h
b/minifi-api/include/minifi-cpp/core/ProcessorMetricsExtension.h
new file mode 100644
index 000000000..14f06658e
--- /dev/null
+++ b/minifi-api/include/minifi-cpp/core/ProcessorMetricsExtension.h
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <vector>
+
+#include "minifi-cpp/core/state/Value.h"
+#include "minifi-cpp/core/state/PublishedMetricProvider.h"
+
+namespace org::apache::nifi::minifi::core {
+
+class ProcessorMetricsExtension : public state::PublishedMetricProvider {
+ public:
+ virtual std::vector<state::response::SerializedResponseNode> serialize() = 0;
+};
+
+} // namespace org::apache::nifi::minifi::core
diff --git a/minifi-api/include/minifi-cpp/core/state/nodes/MetricsBase.h
b/minifi-api/include/minifi-cpp/core/state/nodes/MetricsBase.h
index d8d390855..b5a090bbc 100644
--- a/minifi-api/include/minifi-cpp/core/state/nodes/MetricsBase.h
+++ b/minifi-api/include/minifi-cpp/core/state/nodes/MetricsBase.h
@@ -44,12 +44,6 @@ class ResponseNode : public virtual core::CoreComponent,
public virtual Publishe
virtual bool isEmpty() = 0;
};
-class ResponseNodeSource {
- public:
- virtual ~ResponseNodeSource() = default;
- virtual SharedResponseNode getResponseNode() = 0;
-};
-
class NodeReporter {
public:
struct ReportedNode {