This is an automated email from the ASF dual-hosted git repository.

martinzink pushed a commit to branch minifi-api
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 47a6468b5c0fc9ea5f1355f4f658e5c5571b610d
Author: Martin Zink <[email protected]>
AuthorDate: Thu Jan 23 16:13:39 2025 +0100

    Rebase fix
---
 libminifi/include/core/ProcessSession.h            |  2 ++
 libminifi/src/core/ProcessSession.cpp              | 22 ++++++------
 libminifi/src/core/state/nodes/FlowInformation.cpp | 32 ++++++++---------
 libminifi/test/unit/MetricsTests.cpp               | 42 +++++++++++-----------
 .../include/minifi-cpp/core/ProcessSession.h       |  2 ++
 minifi-api/include/minifi-cpp/core/Processor.h     |  2 ++
 .../include/minifi-cpp/core/ProcessorMetrics.h     | 29 +++++++--------
 utils/include/core/Processor.h                     |  4 +--
 utils/include/core/ProcessorMetrics.h              | 42 +++++++++++-----------
 utils/src/core/Processor.cpp                       |  2 +-
 utils/src/core/ProcessorMetrics.cpp                | 30 ++++++++--------
 11 files changed, 108 insertions(+), 101 deletions(-)

diff --git a/libminifi/include/core/ProcessSession.h 
b/libminifi/include/core/ProcessSession.h
index b424acbce..38665c662 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -110,6 +110,8 @@ class ProcessSessionImpl : public ReferenceContainerImpl, 
public virtual Process
   void importFrom(io::InputStream&& stream, const 
std::shared_ptr<core::FlowFile> &flow) override;
 
   void import(const std::string& source, const std::shared_ptr<core::FlowFile> 
&flow, bool keepSource = true, uint64_t offset = 0) override;
+  void import(const std::string& source, 
std::vector<std::shared_ptr<FlowFile>> &flows, uint64_t offset, char 
inputDelimiter) override;
+  void import(const std::string& source, 
std::vector<std::shared_ptr<FlowFile>> &flows, bool keepSource, uint64_t 
offset, char inputDelimiter) override;
 
   bool exportContent(const std::string &destination, const 
std::shared_ptr<core::FlowFile> &flow, bool keepContent) override;
 
diff --git a/libminifi/src/core/ProcessSession.cpp 
b/libminifi/src/core/ProcessSession.cpp
index 0aa27752a..5526cda69 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -267,7 +267,7 @@ void ProcessSessionImpl::write(core::FlowFile &flow, const 
io::OutputStreamCallb
     auto duration = 
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
 - start_time);
     provenance_report_->modifyContent(flow, details, duration);
     if (metrics_) {
-      metrics_->bytes_written += stream->size();
+      metrics_->bytesWritten() += stream->size();
     }
   } catch (const std::exception& exception) {
     logger_->log_debug("Caught Exception during process session write, type: 
{}, what: {}", typeid(exception).name(), exception.what());
@@ -319,7 +319,7 @@ void ProcessSessionImpl::append(const 
std::shared_ptr<core::FlowFile> &flow, con
     }
     flow->setSize(flow_file_size + (stream->size() - 
stream_size_before_callback));
     if (metrics_) {
-      metrics_->bytes_written += stream->size() - stream_size_before_callback;
+      metrics_->bytesWritten() += stream->size() - stream_size_before_callback;
     }
 
     std::stringstream details;
@@ -379,7 +379,7 @@ int64_t ProcessSessionImpl::read(const core::FlowFile& 
flow_file, const io::Inpu
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile 
content");
     }
     if (metrics_) {
-      metrics_->bytes_read += ret;
+      metrics_->bytesRead() += ret;
     }
     return ret;
   } catch (const std::exception& exception) {
@@ -428,8 +428,8 @@ int64_t ProcessSessionImpl::readWrite(const 
std::shared_ptr<core::FlowFile> &flo
     flow->setOffset(0);
     flow->setResourceClaim(output_claim);
     if (metrics_) {
-      metrics_->bytes_written += read_write_result->bytes_written;
-      metrics_->bytes_read += read_write_result->bytes_read;
+      metrics_->bytesWritten() += read_write_result->bytes_written;
+      metrics_->bytesRead() += read_write_result->bytes_read;
     }
 
     return read_write_result->bytes_written;
@@ -498,7 +498,7 @@ void ProcessSessionImpl::importFrom(io::InputStream 
&stream, const std::shared_p
 
     content_stream->close();
     if (metrics_) {
-      metrics_->bytes_written += content_stream->size();
+      metrics_->bytesWritten() += content_stream->size();
     }
     std::stringstream details;
     details << process_context_->getProcessorNode()->getName() << " modify 
flow record content " << flow->getUUIDStr();
@@ -562,7 +562,7 @@ void ProcessSessionImpl::import(const std::string& source, 
const std::shared_ptr
 
         stream->close();
         if (metrics_) {
-          metrics_->bytes_written += stream->size();
+          metrics_->bytesWritten() += stream->size();
         }
         input.close();
         if (!keepSource) {
@@ -667,7 +667,7 @@ void ProcessSessionImpl::import(const std::string& source, 
std::vector<std::shar
             flowFile->getOffset(), flowFile->getSize(), 
flowFile->getResourceClaim()->getContentFullPath(), flowFile->getUUIDStr());
         stream->close();
         if (metrics_) {
-          metrics_->bytes_written += stream->size();
+          metrics_->bytesWritten() += stream->size();
         }
         std::string details = process_context_->getProcessorNode()->getName() 
+ " modify flow record content " + flowFile->getUUIDStr();
         auto duration = 
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
 - start_time);
@@ -953,7 +953,7 @@ void ProcessSessionImpl::commit() {
     if (metrics_) {
       auto time_delta = std::chrono::steady_clock::now() - commit_start_time;
       
metrics_->addLastSessionCommitRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(time_delta));
-      metrics_->processing_nanos += 
std::chrono::duration_cast<std::chrono::nanoseconds>(time_delta).count();
+      metrics_->processingNanos() += 
std::chrono::duration_cast<std::chrono::nanoseconds>(time_delta).count();
     }
   } catch (const std::exception& exception) {
     logger_->log_debug("Caught Exception during process session commit, type: 
{}, what: {}", typeid(exception).name(), exception.what());
@@ -1165,8 +1165,8 @@ std::shared_ptr<core::FlowFile> ProcessSessionImpl::get() 
{
         ret->setAttribute(SpecialFlowAttribute::FLOW_ID, 
flow_version->getFlowId());
       }
       if (metrics_) {
-        metrics_->incoming_bytes += ret->getSize();
-        ++metrics_->incoming_flow_files;
+        metrics_->incomingBytes() += ret->getSize();
+        ++metrics_->incomingFlowFiles();
       }
       return ret;
     }
diff --git a/libminifi/src/core/state/nodes/FlowInformation.cpp 
b/libminifi/src/core/state/nodes/FlowInformation.cpp
index f31b26c29..de8be246f 100644
--- a/libminifi/src/core/state/nodes/FlowInformation.cpp
+++ b/libminifi/src/core/state/nodes/FlowInformation.cpp
@@ -81,14 +81,14 @@ std::vector<SerializedResponseNode> 
FlowInformation::serialize() {
         .children = {
           {.name = "id", .value = std::string{processor->getUUIDStr()}},
           {.name = "groupId", .value = processor->getProcessGroupUUIDStr()},
-          {.name = "bytesRead", .value = metrics->bytes_read.load()},
-          {.name = "bytesWritten", .value = metrics->bytes_written.load()},
-          {.name = "flowFilesIn", .value = 
metrics->incoming_flow_files.load()},
-          {.name = "flowFilesOut", .value = 
metrics->transferred_flow_files.load()},
-          {.name = "bytesIn", .value = metrics->incoming_bytes.load()},
-          {.name = "bytesOut", .value = metrics->transferred_bytes.load()},
-          {.name = "invocations", .value = metrics->invocations.load()},
-          {.name = "processingNanos", .value = 
metrics->processing_nanos.load()},
+          {.name = "bytesRead", .value = metrics->bytesRead().load()},
+          {.name = "bytesWritten", .value = metrics->bytesWritten().load()},
+          {.name = "flowFilesIn", .value = 
metrics->incomingFlowFiles().load()},
+          {.name = "flowFilesOut", .value = 
metrics->transferredFlowFiles().load()},
+          {.name = "bytesIn", .value = metrics->incomingBytes().load()},
+          {.name = "bytesOut", .value = metrics->transferredBytes().load()},
+          {.name = "invocations", .value = metrics->invocations().load()},
+          {.name = "processingNanos", .value = 
metrics->processingNanos().load()},
           {.name = "activeThreadCount", .value = -1},
           {.name = "terminatedThreadCount", .value = -1},
           {.name = "runStatus", .value = (processor->isRunning() ? "Running" : 
"Stopped")}
@@ -115,21 +115,21 @@ std::vector<PublishedMetric> 
FlowInformation::calculateMetrics() {
       continue;
     }
     auto processor_metrics = processor->getMetrics();
-    metrics.push_back({"bytes_read", 
gsl::narrow<double>(processor_metrics->bytes_read.load()),
+    metrics.push_back({"bytes_read", 
gsl::narrow<double>(processor_metrics->bytesRead().load()),
         {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", 
processor->getName()}, {"metric_class", "FlowInformation"}}});
-    metrics.push_back({"bytes_written", 
gsl::narrow<double>(processor_metrics->bytes_written.load()),
+    metrics.push_back({"bytes_written", 
gsl::narrow<double>(processor_metrics->bytesWritten().load()),
         {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", 
processor->getName()}, {"metric_class", "FlowInformation"}}});
-    metrics.push_back({"flow_files_in", 
gsl::narrow<double>(processor_metrics->incoming_flow_files.load()),
+    metrics.push_back({"flow_files_in", 
gsl::narrow<double>(processor_metrics->incomingFlowFiles().load()),
         {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", 
processor->getName()}, {"metric_class", "FlowInformation"}}});
-    metrics.push_back({"flow_files_out", 
gsl::narrow<double>(processor_metrics->transferred_flow_files.load()),
+    metrics.push_back({"flow_files_out", 
gsl::narrow<double>(processor_metrics->transferredFlowFiles().load()),
         {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", 
processor->getName()}, {"metric_class", "FlowInformation"}}});
-    metrics.push_back({"bytes_in", 
gsl::narrow<double>(processor_metrics->incoming_bytes.load()),
+    metrics.push_back({"bytes_in", 
gsl::narrow<double>(processor_metrics->incomingBytes().load()),
         {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", 
processor->getName()}, {"metric_class", "FlowInformation"}}});
-    metrics.push_back({"bytes_out", 
gsl::narrow<double>(processor_metrics->transferred_bytes.load()),
+    metrics.push_back({"bytes_out", 
gsl::narrow<double>(processor_metrics->transferredBytes().load()),
         {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", 
processor->getName()}, {"metric_class", "FlowInformation"}}});
-    metrics.push_back({"invocations", 
gsl::narrow<double>(processor_metrics->invocations.load()),
+    metrics.push_back({"invocations", 
gsl::narrow<double>(processor_metrics->invocations().load()),
         {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", 
processor->getName()}, {"metric_class", "FlowInformation"}}});
-    metrics.push_back({"processing_nanos", 
gsl::narrow<double>(processor_metrics->processing_nanos.load()),
+    metrics.push_back({"processing_nanos", 
gsl::narrow<double>(processor_metrics->processingNanos().load()),
         {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", 
processor->getName()}, {"metric_class", "FlowInformation"}}});
     metrics.push_back({"is_running", (processor->isRunning() ? 1.0 : 0.0),
         {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", 
processor->getName()}, {"metric_class", "FlowInformation"}}});
diff --git a/libminifi/test/unit/MetricsTests.cpp 
b/libminifi/test/unit/MetricsTests.cpp
index f8c986af4..806970b07 100644
--- a/libminifi/test/unit/MetricsTests.cpp
+++ b/libminifi/test/unit/MetricsTests.cpp
@@ -287,12 +287,12 @@ TEST_CASE("Test commit runtime processor metrics", 
"[ProcessorMetrics]") {
   REQUIRE(metrics.getAverageSessionCommitRuntime() == 37ms);
 }
 
-class DuplicateContentProcessor : public minifi::core::Processor {
-  using minifi::core::Processor::Processor;
+class DuplicateContentProcessor : public minifi::core::ProcessorImpl {
+  using minifi::core::ProcessorImpl::ProcessorImpl;
 
  public:
-  DuplicateContentProcessor(std::string_view name, const 
minifi::utils::Identifier& uuid) : Processor(name, uuid) {}
-  explicit DuplicateContentProcessor(std::string_view name) : Processor(name) 
{}
+  DuplicateContentProcessor(std::string_view name, const 
minifi::utils::Identifier& uuid) : ProcessorImpl(name, uuid) {}
+  explicit DuplicateContentProcessor(std::string_view name) : 
ProcessorImpl(name) {}
   static constexpr const char* Description = "A processor that creates two 
more of the same flow file.";
   static constexpr auto Properties = std::array<core::PropertyReference, 0>{};
   static constexpr auto Success = core::RelationshipDefinition{"success", 
"Newly created FlowFiles"};
@@ -333,29 +333,29 @@ TEST_CASE("Test processor metrics change after trigger", 
"[ProcessorMetrics]") {
   minifi::test::SingleProcessorTestController 
test_controller(std::make_unique<DuplicateContentProcessor>("DuplicateContentProcessor"));
   test_controller.trigger({minifi::test::InputFlowFileData{"log line 1", {}}});
   auto metrics = test_controller.getProcessor()->getMetrics();
-  CHECK(metrics->invocations == 1);
-  CHECK(metrics->incoming_flow_files == 1);
-  CHECK(metrics->transferred_flow_files == 2);
+  CHECK(metrics->invocations() == 1);
+  CHECK(metrics->incomingFlowFiles() == 1);
+  CHECK(metrics->transferredFlowFiles() == 2);
   CHECK(metrics->getTransferredFlowFilesToRelationshipCount("success") == 1);
   CHECK(metrics->getTransferredFlowFilesToRelationshipCount("original") == 1);
-  CHECK(metrics->incoming_bytes == 10);
-  CHECK(metrics->transferred_bytes == 30);
-  CHECK(metrics->bytes_read == 10);
-  CHECK(metrics->bytes_written == 20);
-  auto old_nanos = metrics->processing_nanos.load();
-  CHECK(metrics->processing_nanos > 0);
+  CHECK(metrics->incomingBytes() == 10);
+  CHECK(metrics->transferredBytes() == 30);
+  CHECK(metrics->bytesRead() == 10);
+  CHECK(metrics->bytesWritten() == 20);
+  auto old_nanos = metrics->processingNanos().load();
+  CHECK(metrics->processingNanos() > 0);
 
   test_controller.trigger({minifi::test::InputFlowFileData{"new log line 2", 
{}}});
-  CHECK(metrics->invocations == 2);
-  CHECK(metrics->incoming_flow_files == 2);
-  CHECK(metrics->transferred_flow_files == 4);
+  CHECK(metrics->invocations() == 2);
+  CHECK(metrics->incomingFlowFiles() == 2);
+  CHECK(metrics->transferredFlowFiles() == 4);
   CHECK(metrics->getTransferredFlowFilesToRelationshipCount("success") == 2);
   CHECK(metrics->getTransferredFlowFilesToRelationshipCount("original") == 2);
-  CHECK(metrics->incoming_bytes == 24);
-  CHECK(metrics->transferred_bytes == 72);
-  CHECK(metrics->bytes_read == 24);
-  CHECK(metrics->bytes_written == 48);
-  CHECK(metrics->processing_nanos > old_nanos);
+  CHECK(metrics->incomingBytes() == 24);
+  CHECK(metrics->transferredBytes() == 72);
+  CHECK(metrics->bytesRead() == 24);
+  CHECK(metrics->bytesWritten() == 48);
+  CHECK(metrics->processingNanos() > old_nanos);
 }
 
 
diff --git a/minifi-api/include/minifi-cpp/core/ProcessSession.h 
b/minifi-api/include/minifi-cpp/core/ProcessSession.h
index e19af3b18..196357e21 100644
--- a/minifi-api/include/minifi-cpp/core/ProcessSession.h
+++ b/minifi-api/include/minifi-cpp/core/ProcessSession.h
@@ -97,6 +97,8 @@ class ProcessSession : public virtual ReferenceContainer {
 
   // import from the data source.
   virtual void import(const std::string& source, const 
std::shared_ptr<core::FlowFile> &flow, bool keepSource = true, uint64_t offset 
= 0) = 0;
+  virtual void import(const std::string& source, 
std::vector<std::shared_ptr<FlowFile>> &flows, uint64_t offset, char 
inputDelimiter) = 0;
+  virtual void import(const std::string& source, 
std::vector<std::shared_ptr<FlowFile>> &flows, bool keepSource, uint64_t 
offset, char inputDelimiter) = 0;
 
   /**
    * Exports the data stream to a file
diff --git a/minifi-api/include/minifi-cpp/core/Processor.h 
b/minifi-api/include/minifi-cpp/core/Processor.h
index 092df1426..52b9ab6c9 100644
--- a/minifi-api/include/minifi-cpp/core/Processor.h
+++ b/minifi-api/include/minifi-cpp/core/Processor.h
@@ -83,6 +83,8 @@ class Processor : public virtual Connectable, public virtual 
ConfigurableCompone
   virtual void validateAnnotations() const = 0;
   virtual annotation::Input getInputRequirement() const = 0;
   virtual gsl::not_null<std::shared_ptr<ProcessorMetrics>> getMetrics() const 
= 0;
+  virtual std::string getProcessGroupUUIDStr() const = 0;
+  virtual void setProcessGroupUUIDStr(const std::string &uuid) = 0;
 
   virtual void updateReachability(const std::lock_guard<std::mutex>& 
graph_lock, bool force = false) = 0;
   virtual const std::unordered_map<Connection*, 
std::unordered_set<Processor*>>& reachable_processors() const = 0;
diff --git a/minifi-api/include/minifi-cpp/core/ProcessorMetrics.h 
b/minifi-api/include/minifi-cpp/core/ProcessorMetrics.h
index 5e4825731..8d19c1cd1 100644
--- a/minifi-api/include/minifi-cpp/core/ProcessorMetrics.h
+++ b/minifi-api/include/minifi-cpp/core/ProcessorMetrics.h
@@ -34,23 +34,24 @@ class ProcessorMetrics : public virtual 
state::response::ResponseNode {
   virtual std::chrono::milliseconds getAverageSessionCommitRuntime() const = 0;
   virtual std::chrono::milliseconds getLastSessionCommitRuntime() const = 0;
   virtual void addLastSessionCommitRuntime(std::chrono::milliseconds runtime) 
= 0;
+  virtual std::optional<size_t> 
getTransferredFlowFilesToRelationshipCount(const std::string& relationship) 
const = 0;
 
   virtual std::atomic<size_t>& invocations() = 0;
   virtual const std::atomic<size_t>& invocations() const = 0;
-  virtual std::atomic<size_t>& incoming_flow_files() = 0;
-  virtual const std::atomic<size_t>& incoming_flow_files() const = 0;
-  virtual std::atomic<size_t>& transferred_flow_files() = 0;
-  virtual const std::atomic<size_t>& transferred_flow_files() const = 0;
-  virtual std::atomic<uint64_t>& incoming_bytes() = 0;
-  virtual const std::atomic<uint64_t>& incoming_bytes() const = 0;
-  virtual std::atomic<uint64_t>& transferred_bytes() = 0;
-  virtual const std::atomic<uint64_t>& transferred_bytes() const = 0;
-  virtual std::atomic<uint64_t>& bytes_read() = 0;
-  virtual const std::atomic<uint64_t>& bytes_read() const = 0;
-  virtual std::atomic<uint64_t>& bytes_written() = 0;
-  virtual const std::atomic<uint64_t>& bytes_written() const = 0;
-  virtual std::atomic<uint64_t>& processing_nanos() = 0;
-  virtual const std::atomic<uint64_t>& processing_nanos() const = 0;
+  virtual std::atomic<size_t>& incomingFlowFiles() = 0;
+  virtual const std::atomic<size_t>& incomingFlowFiles() const = 0;
+  virtual std::atomic<size_t>& transferredFlowFiles() = 0;
+  virtual const std::atomic<size_t>& transferredFlowFiles() const = 0;
+  virtual std::atomic<uint64_t>& incomingBytes() = 0;
+  virtual const std::atomic<uint64_t>& incomingBytes() const = 0;
+  virtual std::atomic<uint64_t>& transferredBytes() = 0;
+  virtual const std::atomic<uint64_t>& transferredBytes() const = 0;
+  virtual std::atomic<uint64_t>& bytesRead() = 0;
+  virtual const std::atomic<uint64_t>& bytesRead() const = 0;
+  virtual std::atomic<uint64_t>& bytesWritten() = 0;
+  virtual const std::atomic<uint64_t>& bytesWritten() const = 0;
+  virtual std::atomic<uint64_t>& processingNanos() = 0;
+  virtual const std::atomic<uint64_t>& processingNanos() const = 0;
 };
 
 }  // namespace org::apache::nifi::minifi::core
diff --git a/utils/include/core/Processor.h b/utils/include/core/Processor.h
index 336f13a59..4a17fcc9b 100644
--- a/utils/include/core/Processor.h
+++ b/utils/include/core/Processor.h
@@ -167,11 +167,11 @@ class ProcessorImpl : public virtual Processor, public 
ConnectableImpl, public C
     active_tasks_ = 0;
   }
 
-  std::string getProcessGroupUUIDStr() const {
+  std::string getProcessGroupUUIDStr() const override {
     return process_group_uuid_;
   }
 
-  void setProcessGroupUUIDStr(const std::string &uuid) {
+  void setProcessGroupUUIDStr(const std::string &uuid) override {
     process_group_uuid_ = uuid;
   }
 
diff --git a/utils/include/core/ProcessorMetrics.h 
b/utils/include/core/ProcessorMetrics.h
index 3d1ebfb45..030d2ae34 100644
--- a/utils/include/core/ProcessorMetrics.h
+++ b/utils/include/core/ProcessorMetrics.h
@@ -49,27 +49,27 @@ class ProcessorMetricsImpl : public 
state::response::ResponseNodeImpl, public vi
   std::chrono::milliseconds getLastOnTriggerRuntime() const override;
   void addLastOnTriggerRuntime(std::chrono::milliseconds runtime) override;
 
-  std::chrono::milliseconds getAverageSessionCommitRuntime() const;
-  std::chrono::milliseconds getLastSessionCommitRuntime() const;
-  void addLastSessionCommitRuntime(std::chrono::milliseconds runtime);
-  std::optional<size_t> getTransferredFlowFilesToRelationshipCount(const 
std::string& relationship) const;
-
-  std::atomic<size_t>& invocations() {return invocations_;}
-  const std::atomic<size_t>& invocations() const {return invocations_;}
-  std::atomic<size_t>& incoming_flow_files() {return incoming_flow_files_;}
-  const std::atomic<size_t>& incoming_flow_files() const {return 
incoming_flow_files_;}
-  std::atomic<size_t>& transferred_flow_files() {return 
transferred_flow_files_;}
-  const std::atomic<size_t>& transferred_flow_files() const {return 
transferred_flow_files_;}
-  std::atomic<uint64_t>& incoming_bytes() {return incoming_bytes_;}
-  const std::atomic<uint64_t>& incoming_bytes() const {return incoming_bytes_;}
-  std::atomic<uint64_t>& transferred_bytes() {return transferred_bytes_;}
-  const std::atomic<uint64_t>& transferred_bytes() const {return 
transferred_bytes_;}
-  std::atomic<uint64_t>& bytes_read() {return bytes_read_;}
-  const std::atomic<uint64_t>& bytes_read() const {return bytes_read_;}
-  std::atomic<uint64_t>& bytes_written() {return bytes_written_;}
-  const std::atomic<uint64_t>& bytes_written() const {return bytes_written_;}
-  std::atomic<uint64_t>& processing_nanos() {return processing_nanos_;}
-  const std::atomic<uint64_t>& processing_nanos() const {return 
processing_nanos_;}
+  std::chrono::milliseconds getAverageSessionCommitRuntime() const override;
+  std::chrono::milliseconds getLastSessionCommitRuntime() const override;
+  void addLastSessionCommitRuntime(std::chrono::milliseconds runtime) override;
+  std::optional<size_t> getTransferredFlowFilesToRelationshipCount(const 
std::string& relationship) const override;
+
+  std::atomic<size_t>& invocations() override {return invocations_;}
+  const std::atomic<size_t>& invocations() const override {return 
invocations_;}
+  std::atomic<size_t>& incomingFlowFiles() override {return 
incoming_flow_files_;}
+  const std::atomic<size_t>& incomingFlowFiles() const override {return 
incoming_flow_files_;}
+  std::atomic<size_t>& transferredFlowFiles() override {return 
transferred_flow_files_;}
+  const std::atomic<size_t>& transferredFlowFiles() const override {return 
transferred_flow_files_;}
+  std::atomic<uint64_t>& incomingBytes() override {return incoming_bytes_;}
+  const std::atomic<uint64_t>& incomingBytes() const override {return 
incoming_bytes_;}
+  std::atomic<uint64_t>& transferredBytes() override {return 
transferred_bytes_;}
+  const std::atomic<uint64_t>& transferredBytes() const override {return 
transferred_bytes_;}
+  std::atomic<uint64_t>& bytesRead() override {return bytes_read_;}
+  const std::atomic<uint64_t>& bytesRead() const override {return bytes_read_;}
+  std::atomic<uint64_t>& bytesWritten() override {return bytes_written_;}
+  const std::atomic<uint64_t>& bytesWritten() const override {return 
bytes_written_;}
+  std::atomic<uint64_t>& processingNanos() override {return processing_nanos_;}
+  const std::atomic<uint64_t>& processingNanos() const override {return 
processing_nanos_;}
 
  protected:
   template<typename ValueType>
diff --git a/utils/src/core/Processor.cpp b/utils/src/core/Processor.cpp
index 1e17f80d4..cf2eb6b53 100644
--- a/utils/src/core/Processor.cpp
+++ b/utils/src/core/Processor.cpp
@@ -201,7 +201,7 @@ void ProcessorImpl::triggerAndCommit(const 
std::shared_ptr<ProcessContext>& cont
 }
 
 void ProcessorImpl::trigger(const std::shared_ptr<ProcessContext>& context, 
const std::shared_ptr<ProcessSession>& process_session) {
-  ++metrics_->invocations;
+  ++metrics_->invocations();
   const auto start = std::chrono::steady_clock::now();
   onTrigger(*context, *process_session);
   
metrics_->addLastOnTriggerRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
 - start));
diff --git a/utils/src/core/ProcessorMetrics.cpp 
b/utils/src/core/ProcessorMetrics.cpp
index 81e66c2c9..f615be769 100644
--- a/utils/src/core/ProcessorMetrics.cpp
+++ b/utils/src/core/ProcessorMetrics.cpp
@@ -49,13 +49,13 @@ std::vector<state::response::SerializedResponseNode> 
ProcessorMetricsImpl::seria
       {.name = "LastOnTriggerRunTime", .value = 
static_cast<uint64_t>(getLastOnTriggerRuntime().count())},
       {.name = "AverageSessionCommitRunTime", .value = 
static_cast<uint64_t>(getAverageSessionCommitRuntime().count())},
       {.name = "LastSessionCommitRunTime", .value = 
static_cast<uint64_t>(getLastSessionCommitRuntime().count())},
-      {.name = "TransferredFlowFiles", .value = 
static_cast<uint32_t>(transferred_flow_files())},
-      {.name = "TransferredBytes", .value = 
static_cast<uint64_t>(transferred_bytes())},
-      {.name = "IncomingFlowFiles", .value = 
static_cast<uint32_t>(incoming_flow_files())},
-      {.name = "IncomingBytes", .value = 
static_cast<uint64_t>(incoming_bytes())},
-      {.name = "BytesRead", .value = static_cast<uint64_t>(bytes_read())},
-      {.name = "BytesWritten", .value = 
static_cast<uint64_t>(bytes_written())},
-      {.name = "ProcessingNanos", .value = 
static_cast<uint64_t>(processing_nanos())}
+      {.name = "TransferredFlowFiles", .value = 
static_cast<uint32_t>(transferredFlowFiles())},
+      {.name = "TransferredBytes", .value = 
static_cast<uint64_t>(transferredBytes())},
+      {.name = "IncomingFlowFiles", .value = 
static_cast<uint32_t>(incomingFlowFiles())},
+      {.name = "IncomingBytes", .value = 
static_cast<uint64_t>(incomingBytes())},
+      {.name = "BytesRead", .value = static_cast<uint64_t>(bytesRead())},
+      {.name = "BytesWritten", .value = static_cast<uint64_t>(bytesWritten())},
+      {.name = "ProcessingNanos", .value = 
static_cast<uint64_t>(processingNanos())}
     }
   };
 
@@ -83,13 +83,13 @@ std::vector<state::PublishedMetric> 
ProcessorMetricsImpl::calculateMetrics() {
     {"last_onTrigger_runtime_milliseconds", 
static_cast<double>(getLastOnTriggerRuntime().count()), getCommonLabels()},
     {"average_session_commit_runtime_milliseconds", 
static_cast<double>(getAverageSessionCommitRuntime().count()), 
getCommonLabels()},
     {"last_session_commit_runtime_milliseconds", 
static_cast<double>(getLastSessionCommitRuntime().count()), getCommonLabels()},
-    {"transferred_flow_files", static_cast<double>(transferred_flow_files()), 
getCommonLabels()},
-    {"transferred_bytes", static_cast<double>(transferred_bytes()), 
getCommonLabels()},
-    {"incoming_flow_files", static_cast<double>(incoming_flow_files()), 
getCommonLabels()},
-    {"incoming_bytes", static_cast<double>(incoming_bytes()), 
getCommonLabels()},
-    {"bytes_read", static_cast<double>(bytes_read()), getCommonLabels()},
-    {"bytes_written", static_cast<double>(bytes_written()), getCommonLabels()},
-    {"processing_nanos", static_cast<double>(processing_nanos()), 
getCommonLabels()}
+    {"transferred_flow_files", static_cast<double>(transferredFlowFiles()), 
getCommonLabels()},
+    {"transferred_bytes", static_cast<double>(transferredBytes()), 
getCommonLabels()},
+    {"incoming_flow_files", static_cast<double>(incomingFlowFiles()), 
getCommonLabels()},
+    {"incoming_bytes", static_cast<double>(incomingBytes()), 
getCommonLabels()},
+    {"bytes_read", static_cast<double>(bytesRead()), getCommonLabels()},
+    {"bytes_written", static_cast<double>(bytesWritten()), getCommonLabels()},
+    {"processing_nanos", static_cast<double>(processingNanos()), 
getCommonLabels()}
   };
 
   {
@@ -132,7 +132,7 @@ std::chrono::milliseconds 
ProcessorMetricsImpl::getLastSessionCommitRuntime() co
   return session_commit_runtime_averager_.getLastValue();
 }
 
-std::optional<size_t> 
ProcessorMetrics::getTransferredFlowFilesToRelationshipCount(const std::string& 
relationship) const {
+std::optional<size_t> 
ProcessorMetricsImpl::getTransferredFlowFilesToRelationshipCount(const 
std::string& relationship) const {
   std::lock_guard<std::mutex> lock(transferred_relationships_mutex_);
   const auto relationship_it = transferred_relationships_.find(relationship);
   if (relationship_it != std::end(transferred_relationships_)) {

Reply via email to