This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new f262e74170 NIFI-13439 Added Performance Status to Group Status 
responses
f262e74170 is described below

commit f262e741704d1ab8bd314772445f5cbac609d533
Author: Timea Barna <[email protected]>
AuthorDate: Thu Jun 27 07:41:59 2024 +0200

    NIFI-13439 Added Performance Status to Group Status responses
    
    - Added ProcessingPerformanceStatus to nifi-api
    - Added Performance Status to Process Group and Processor Sources for Query 
NiFi Reporting Task
    
    This closes #9014
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../nifi/controller/status/ProcessGroupStatus.java |  28 ++++++
 .../status/ProcessingPerformanceStatus.java        | 107 +++++++++++++++++++++
 .../nifi/controller/status/ProcessorStatus.java    |  12 +++
 .../datasources/ProcessGroupStatusDataSource.java  |  14 ++-
 .../sql/datasources/ProcessorStatusDataSource.java |  14 ++-
 .../reporting/sql/TestQueryNiFiReportingTask.java  |   2 +-
 .../dto/status/ProcessGroupStatusSnapshotDTO.java  |  14 +++
 .../dto/status/ProcessingPerformanceStatusDTO.java | 100 +++++++++++++++++++
 .../api/dto/status/ProcessorStatusSnapshotDTO.java |  13 +++
 .../manager/ProcessingPerformanceStatusMerger.java |  34 +++++++
 .../apache/nifi/cluster/manager/StatusMerger.java  |   6 ++
 .../apache/nifi/reporting/AbstractEventAccess.java |  27 ++++++
 .../nifi/reporting/PerformanceMetricsUtil.java     |  37 +++++++
 .../org/apache/nifi/web/api/dto/DtoFactory.java    |  26 +++++
 14 files changed, 429 insertions(+), 5 deletions(-)

diff --git 
a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java
 
b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java
index 8709055305..670af28449 100644
--- 
a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java
+++ 
b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java
@@ -57,6 +57,8 @@ public class ProcessGroupStatus implements Cloneable {
     private Collection<PortStatus> inputPortStatus = new ArrayList<>();
     private Collection<PortStatus> outputPortStatus = new ArrayList<>();
 
+    private ProcessingPerformanceStatus processingPerformanceStatus;
+
     public String getId() {
         return id;
     }
@@ -273,6 +275,14 @@ public class ProcessGroupStatus implements Cloneable {
         this.processingNanos = processingNanos;
     }
 
+    public ProcessingPerformanceStatus getProcessingPerformanceStatus() {
+        return processingPerformanceStatus;
+    }
+
+    public void setProcessingPerformanceStatus(ProcessingPerformanceStatus 
processingPerformanceStatus) {
+        this.processingPerformanceStatus = processingPerformanceStatus;
+    }
+
     @Override
     public ProcessGroupStatus clone() {
         final ProcessGroupStatus clonedObj = new ProcessGroupStatus();
@@ -296,6 +306,7 @@ public class ProcessGroupStatus implements Cloneable {
         clonedObj.flowFilesTransferred = flowFilesTransferred;
         clonedObj.bytesTransferred = bytesTransferred;
         clonedObj.processingNanos = processingNanos;
+        clonedObj.processingPerformanceStatus = processingPerformanceStatus;
 
         if (connectionStatus != null) {
             final Collection<ConnectionStatus> statusList = new ArrayList<>();
@@ -418,6 +429,9 @@ public class ProcessGroupStatus implements Cloneable {
             builder.append(status);
         }
 
+        builder.append(", processingPerformanceStatus=");
+        builder.append(processingPerformanceStatus);
+
         builder.append("]");
         return builder.toString();
     }
@@ -620,6 +634,20 @@ public class ProcessGroupStatus implements Cloneable {
         }
 
         target.setRemoteProcessGroupStatus(mergedRemoteGroupMap.values());
+
+        final ProcessingPerformanceStatus targetPerformanceStatus = 
target.getProcessingPerformanceStatus();
+        final ProcessingPerformanceStatus toMergePerformanceStatus = 
toMerge.getProcessingPerformanceStatus();
+
+        if (targetPerformanceStatus != null && toMergePerformanceStatus != 
null) {
+            
targetPerformanceStatus.setIdentifier(toMergePerformanceStatus.getIdentifier());
+            
targetPerformanceStatus.setCpuDuration(targetPerformanceStatus.getCpuDuration() 
+ toMergePerformanceStatus.getCpuDuration());
+            
targetPerformanceStatus.setContentReadDuration(targetPerformanceStatus.getContentReadDuration()
 + toMergePerformanceStatus.getContentReadDuration());
+            
targetPerformanceStatus.setContentWriteDuration(targetPerformanceStatus.getContentWriteDuration()
 + toMergePerformanceStatus.getContentWriteDuration());
+            
targetPerformanceStatus.setSessionCommitDuration(targetPerformanceStatus.getSessionCommitDuration()
 + toMergePerformanceStatus.getSessionCommitDuration());
+            
targetPerformanceStatus.setGarbageCollectionDuration(targetPerformanceStatus.getGarbageCollectionDuration()
 + toMergePerformanceStatus.getGarbageCollectionDuration());
+        } else {
+            target.setProcessingPerformanceStatus(targetPerformanceStatus);
+        }
     }
 
     public static FlowFileAvailability mergeFlowFileAvailability(final 
FlowFileAvailability availabilityA, final FlowFileAvailability availabilityB) {
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessingPerformanceStatus.java
 
b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessingPerformanceStatus.java
new file mode 100644
index 0000000000..e87894c343
--- /dev/null
+++ 
b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessingPerformanceStatus.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status;
+
+public class ProcessingPerformanceStatus implements Cloneable {
+
+    private String identifier;
+    private long cpuDuration;
+    private long contentReadDuration;
+    private long contentWriteDuration;
+    private long sessionCommitDuration;
+    private long garbageCollectionDuration;
+
+    public String getIdentifier() {
+        return identifier;
+    }
+
+    public void setIdentifier(String identifier) {
+        this.identifier = identifier;
+    }
+
+    public long getCpuDuration() {
+        return cpuDuration;
+    }
+
+    public void setCpuDuration(long cpuDuration) {
+        this.cpuDuration = cpuDuration;
+    }
+
+    public long getContentReadDuration() {
+        return contentReadDuration;
+    }
+
+    public void setContentReadDuration(long contentReadDuration) {
+        this.contentReadDuration = contentReadDuration;
+    }
+
+    public long getContentWriteDuration() {
+        return contentWriteDuration;
+    }
+
+    public void setContentWriteDuration(long contentWriteDuration) {
+        this.contentWriteDuration = contentWriteDuration;
+    }
+
+    public long getSessionCommitDuration() {
+        return sessionCommitDuration;
+    }
+
+    public void setSessionCommitDuration(long sessionCommitDuration) {
+        this.sessionCommitDuration = sessionCommitDuration;
+    }
+
+    public long getGarbageCollectionDuration() {
+        return garbageCollectionDuration;
+    }
+
+    public void setGarbageCollectionDuration(long garbageCollectionDuration) {
+        this.garbageCollectionDuration = garbageCollectionDuration;
+    }
+
+    @Override
+    public ProcessingPerformanceStatus clone() {
+        final ProcessingPerformanceStatus clonedObj = new 
ProcessingPerformanceStatus();
+
+        clonedObj.identifier = identifier;
+        clonedObj.cpuDuration = cpuDuration;
+        clonedObj.contentReadDuration = contentReadDuration;
+        clonedObj.contentWriteDuration = contentWriteDuration;
+        clonedObj.sessionCommitDuration = sessionCommitDuration;
+        clonedObj.garbageCollectionDuration = garbageCollectionDuration;
+        return clonedObj;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("ProcessorPerformanceStatus [Group ID= ");
+        builder.append(identifier);
+        builder.append(", cpuDuration= ");
+        builder.append(cpuDuration);
+        builder.append(", contentReadDuration= ");
+        builder.append(contentReadDuration);
+        builder.append(", contentWriteDuration= ");
+        builder.append(contentWriteDuration);
+        builder.append(", sessionCommitDuration= ");
+        builder.append(sessionCommitDuration);
+        builder.append(", garbageCollectionDuration= ");
+        builder.append(garbageCollectionDuration);
+        builder.append("]");
+        return builder.toString();
+    }
+}
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java 
b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java
index ba90534239..380829d38f 100644
--- 
a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java
+++ 
b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java
@@ -48,6 +48,7 @@ public class ProcessorStatus implements Cloneable {
     private int flowFilesSent;
     private long bytesSent;
     private Map<String, Long> counters;
+    private ProcessingPerformanceStatus processingPerformanceStatus;
 
     public String getId() {
         return id;
@@ -241,6 +242,14 @@ public class ProcessorStatus implements Cloneable {
         this.counters = counters;
     }
 
+    public ProcessingPerformanceStatus getProcessingPerformanceStatus() {
+        return processingPerformanceStatus;
+    }
+
+    public void setProcessingPerformanceStatus(ProcessingPerformanceStatus 
processingPerformanceStatus) {
+        this.processingPerformanceStatus = processingPerformanceStatus;
+    }
+
     @Override
     public ProcessorStatus clone() {
         final ProcessorStatus clonedObj = new ProcessorStatus();
@@ -267,6 +276,7 @@ public class ProcessorStatus implements Cloneable {
         clonedObj.executionNode = executionNode;
         clonedObj.type = type;
         clonedObj.counters = counters == null ? null : new HashMap<>(counters);
+        clonedObj.processingPerformanceStatus = processingPerformanceStatus;
         return clonedObj;
     }
 
@@ -307,6 +317,8 @@ public class ProcessorStatus implements Cloneable {
         builder.append(terminatedThreadCount);
         builder.append(", counters=");
         builder.append(counters);
+        builder.append(", processingPerformanceStatus=");
+        builder.append(processingPerformanceStatus);
         builder.append("]");
         return builder.toString();
     }
diff --git 
a/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ProcessGroupStatusDataSource.java
 
b/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ProcessGroupStatusDataSource.java
index 614682ff2f..9db3963d99 100644
--- 
a/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ProcessGroupStatusDataSource.java
+++ 
b/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ProcessGroupStatusDataSource.java
@@ -50,7 +50,12 @@ public class ProcessGroupStatusDataSource implements 
ResettableDataSource {
         new ColumnSchema("activeThreadCount", int.class, false),
         new ColumnSchema("terminatedThreadCount", int.class, false),
         new ColumnSchema("versionedFlowState", String.class, false),
-        new ColumnSchema("processingNanos", long.class, false)
+        new ColumnSchema("processingNanos", long.class, false),
+        new ColumnSchema("cpuDuration", long.class, false),
+        new ColumnSchema("contentReadDuration", long.class, false),
+        new ColumnSchema("contentWriteDuration", long.class, false),
+        new ColumnSchema("sessionCommitDuration", long.class, false),
+        new ColumnSchema("garbageCollectionDuration", long.class, false)
     ));
 
 
@@ -123,7 +128,12 @@ public class ProcessGroupStatusDataSource implements 
ResettableDataSource {
             status.getTerminatedThreadCount(),
             status.getQueuedCount(),
             status.getVersionedFlowState() == null ? null : 
status.getVersionedFlowState().name(),
-            status.getProcessingNanos()
+            status.getProcessingNanos(),
+            status.getProcessingPerformanceStatus() == null ? -1 : 
status.getProcessingPerformanceStatus().getCpuDuration(),
+            status.getProcessingPerformanceStatus() == null ? -1 : 
status.getProcessingPerformanceStatus().getContentReadDuration(),
+            status.getProcessingPerformanceStatus() == null ? -1 : 
status.getProcessingPerformanceStatus().getContentWriteDuration(),
+            status.getProcessingPerformanceStatus() == null ? -1 : 
status.getProcessingPerformanceStatus().getSessionCommitDuration(),
+            status.getProcessingPerformanceStatus() == null ? -1 : 
status.getProcessingPerformanceStatus().getGarbageCollectionDuration()
         };
     }
 
diff --git 
a/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ProcessorStatusDataSource.java
 
b/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ProcessorStatusDataSource.java
index e3aade120a..ba19cce713 100644
--- 
a/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ProcessorStatusDataSource.java
+++ 
b/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ProcessorStatusDataSource.java
@@ -53,7 +53,12 @@ public class ProcessorStatusDataSource implements 
ResettableDataSource {
         new ColumnSchema("invocations", int.class, false),
         new ColumnSchema("processingNanos", long.class, false),
         new ColumnSchema("runStatus", String.class, false),
-        new ColumnSchema("executionNode", String.class, false)
+        new ColumnSchema("executionNode", String.class, false),
+        new ColumnSchema("cpuDuration", long.class, false),
+        new ColumnSchema("contentReadDuration", long.class, false),
+        new ColumnSchema("contentWriteDuration", long.class, false),
+        new ColumnSchema("sessionCommitDuration", long.class, false),
+        new ColumnSchema("garbageCollectionDuration", long.class, false)
     ));
 
 
@@ -123,7 +128,12 @@ public class ProcessorStatusDataSource implements 
ResettableDataSource {
             status.getInvocations(),
             status.getProcessingNanos(),
             status.getRunStatus().name(),
-            status.getExecutionNode() == null ? null : 
status.getExecutionNode().name()
+            status.getExecutionNode() == null ? null : 
status.getExecutionNode().name(),
+            status.getProcessingPerformanceStatus() == null ? -1 : 
status.getProcessingPerformanceStatus().getCpuDuration(),
+            status.getProcessingPerformanceStatus() == null ? -1 : 
status.getProcessingPerformanceStatus().getContentReadDuration(),
+            status.getProcessingPerformanceStatus() == null ? -1 : 
status.getProcessingPerformanceStatus().getContentWriteDuration(),
+            status.getProcessingPerformanceStatus() == null ? -1 : 
status.getProcessingPerformanceStatus().getSessionCommitDuration(),
+            status.getProcessingPerformanceStatus() == null ? -1 : 
status.getProcessingPerformanceStatus().getGarbageCollectionDuration()
         };
     }
 }
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
 
b/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
index ab80bea754..d79e2473a2 100644
--- 
a/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
+++ 
b/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
@@ -502,7 +502,7 @@ class TestQueryNiFiReportingTask {
         assertEquals(4, rows.size());
         // Validate the first row
         Map<String, Object> row = rows.get(0);
-        assertEquals(21, row.size());
+        assertEquals(26, row.size());
         assertEquals(1L, row.get("bytesRead"));
         // Validate the second row
         row = rows.get(1);
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessGroupStatusSnapshotDTO.java
 
b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessGroupStatusSnapshotDTO.java
index 39593473c5..f6254a7647 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessGroupStatusSnapshotDTO.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessGroupStatusSnapshotDTO.java
@@ -81,6 +81,9 @@ public class ProcessGroupStatusSnapshotDTO implements 
Cloneable {
     private Integer terminatedThreadCount = 0;
     private Long processingNanos = 0L;
 
+    @Schema(description = "Represents the processing performance for all the 
processors in the given process group.")
+    private ProcessingPerformanceStatusDTO processingPerformanceStatus;
+
     /**
      * The id for the process group.
      *
@@ -520,6 +523,14 @@ public class ProcessGroupStatusSnapshotDTO implements 
Cloneable {
         this.processingNanos = processingNanos;
     }
 
+    public ProcessingPerformanceStatusDTO getProcessingPerformanceStatus() {
+        return processingPerformanceStatus;
+    }
+
+    public void setProcessingPerformanceStatus(ProcessingPerformanceStatusDTO 
processingPerformanceStatus) {
+        this.processingPerformanceStatus = processingPerformanceStatus;
+    }
+
     @Override
     public ProcessGroupStatusSnapshotDTO clone() {
         final ProcessGroupStatusSnapshotDTO other = new 
ProcessGroupStatusSnapshotDTO();
@@ -598,6 +609,9 @@ public class ProcessGroupStatusSnapshotDTO implements 
Cloneable {
             other.setProcessGroupStatusSnapshots(childGroups);
         }
 
+
+        other.setProcessingPerformanceStatus(getProcessingPerformanceStatus());
+
         return other;
     }
 
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessingPerformanceStatusDTO.java
 
b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessingPerformanceStatusDTO.java
new file mode 100644
index 0000000000..a63416c396
--- /dev/null
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessingPerformanceStatusDTO.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.dto.status;
+
+import io.swagger.v3.oas.annotations.media.Schema;
+import jakarta.xml.bind.annotation.XmlType;
+
+/**
+ * DTO for serializing the performance status of a processor.
+ */
+@XmlType(name = "processingPerformanceStatus")
+public class ProcessingPerformanceStatusDTO implements Cloneable {
+    private String identifier;
+    private long cpuDuration;
+    private long contentReadDuration;
+    private long contentWriteDuration;
+    private long sessionCommitDuration;
+    private long garbageCollectionDuration;
+
+    @Schema(description = "The unique ID of the process group that the 
Processor belongs to")
+    public String getIdentifier() {
+        return identifier;
+    }
+
+    public void setIdentifier(String identifier) {
+        this.identifier = identifier;
+    }
+
+    @Schema(description = "The number of nanoseconds has spent on CPU usage in 
the last 5 minutes.")
+    public long getCpuDuration() {
+        return cpuDuration;
+    }
+
+    public void setCpuDuration(long cpuDuration) {
+        this.cpuDuration = cpuDuration;
+    }
+
+    @Schema(description = "The number of nanoseconds has spent to read content 
in the last 5 minutes.")
+    public long getContentReadDuration() {
+        return contentReadDuration;
+    }
+
+    public void setContentReadDuration(long contentReadDuration) {
+        this.contentReadDuration = contentReadDuration;
+    }
+
+    @Schema(description = "The number of nanoseconds has spent to write 
content in the last 5 minutes.")
+    public long getContentWriteDuration() {
+        return contentWriteDuration;
+    }
+
+    public void setContentWriteDuration(long contentWriteDuration) {
+        this.contentWriteDuration = contentWriteDuration;
+    }
+
+    @Schema(description = "The number of nanoseconds has spent running to 
commit sessions the last 5 minutes.")
+    public long getSessionCommitDuration() {
+        return sessionCommitDuration;
+    }
+
+    public void setSessionCommitDuration(long sessionCommitDuration) {
+        this.sessionCommitDuration = sessionCommitDuration;
+    }
+
+    @Schema(description = "The number of nanoseconds has spent running garbage 
collection in the last 5 minutes.")
+    public long getGarbageCollectionDuration() {
+        return garbageCollectionDuration;
+    }
+
+    public void setGarbageCollectionDuration(long garbageCollectionDuration) {
+        this.garbageCollectionDuration = garbageCollectionDuration;
+    }
+
+    @Override
+    public ProcessingPerformanceStatusDTO clone() {
+        final ProcessingPerformanceStatusDTO clonedObj = new 
ProcessingPerformanceStatusDTO();
+
+        clonedObj.identifier = identifier;
+        clonedObj.cpuDuration = cpuDuration;
+        clonedObj.contentReadDuration = contentReadDuration;
+        clonedObj.contentWriteDuration = contentWriteDuration;
+        clonedObj.sessionCommitDuration = sessionCommitDuration;
+        clonedObj.garbageCollectionDuration = garbageCollectionDuration;
+        return clonedObj;
+    }
+}
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessorStatusSnapshotDTO.java
 
b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessorStatusSnapshotDTO.java
index 2e9d1ac245..2eaa89488c 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessorStatusSnapshotDTO.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessorStatusSnapshotDTO.java
@@ -53,6 +53,9 @@ public class ProcessorStatusSnapshotDTO implements Cloneable {
     private Integer activeThreadCount = 0;
     private Integer terminatedThreadCount = 0;
 
+    @Schema(description = "Represents the processor's processing performance.")
+    private ProcessingPerformanceStatusDTO processingPerformanceStatus;
+
     /* getters / setters */
     /**
      * @return The processor id
@@ -295,6 +298,14 @@ public class ProcessorStatusSnapshotDTO implements 
Cloneable {
         this.tasksDurationNanos = taskNanos;
     }
 
+    public ProcessingPerformanceStatusDTO getProcessingPerformanceStatus() {
+        return processingPerformanceStatus;
+    }
+
+    public void setProcessingPerformanceStatus(ProcessingPerformanceStatusDTO 
processingPerformanceStatus) {
+        this.processingPerformanceStatus = processingPerformanceStatus;
+    }
+
     @Override
     public ProcessorStatusSnapshotDTO clone() {
         final ProcessorStatusSnapshotDTO other = new 
ProcessorStatusSnapshotDTO();
@@ -322,6 +333,8 @@ public class ProcessorStatusSnapshotDTO implements 
Cloneable {
         other.setWritten(getWritten());
         other.setTasks(getTasks());
 
+        other.setProcessingPerformanceStatus(getProcessingPerformanceStatus());
+
         return other;
     }
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessingPerformanceStatusMerger.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessingPerformanceStatusMerger.java
new file mode 100644
index 0000000000..786c9e1cdd
--- /dev/null
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessingPerformanceStatusMerger.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager;
+
+import org.apache.nifi.web.api.dto.status.ProcessingPerformanceStatusDTO;
+
+public class ProcessingPerformanceStatusMerger {
+    public static void mergeStatus(final ProcessingPerformanceStatusDTO 
target, final ProcessingPerformanceStatusDTO toMerge) {
+        if (target == null || toMerge == null) {
+            return;
+        }
+
+        target.setIdentifier(toMerge.getIdentifier());
+        target.setCpuDuration(target.getCpuDuration() + 
toMerge.getCpuDuration());
+        target.setContentReadDuration(target.getContentReadDuration() + 
toMerge.getContentReadDuration());
+        target.setContentWriteDuration(target.getContentWriteDuration() + 
toMerge.getContentWriteDuration());
+        target.setSessionCommitDuration(target.getSessionCommitDuration() + 
toMerge.getSessionCommitDuration());
+        
target.setGarbageCollectionDuration(target.getGarbageCollectionDuration() + 
toMerge.getGarbageCollectionDuration());
+    }
+}
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
index b20df287aa..221ab41600 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
@@ -177,6 +177,9 @@ public class StatusMerger {
         target.setTerminatedThreadCount(target.getTerminatedThreadCount() + 
toMerge.getTerminatedThreadCount());
 
         target.setProcessingNanos(target.getProcessingNanos() + 
toMerge.getProcessingNanos());
+
+        
ProcessingPerformanceStatusMerger.mergeStatus(target.getProcessingPerformanceStatus(),
 toMerge.getProcessingPerformanceStatus());
+
         updatePrettyPrintedFields(target);
 
         // connection status
@@ -454,6 +457,9 @@ public class StatusMerger {
         target.setTasksDurationNanos(target.getTasksDurationNanos() + 
toMerge.getTasksDurationNanos());
         target.setActiveThreadCount(target.getActiveThreadCount() + 
toMerge.getActiveThreadCount());
         target.setTerminatedThreadCount(target.getTerminatedThreadCount() + 
toMerge.getTerminatedThreadCount());
+
+        
ProcessingPerformanceStatusMerger.mergeStatus(target.getProcessingPerformanceStatus(),
 toMerge.getProcessingPerformanceStatus());
+
         updatePrettyPrintedFields(target);
     }
 
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
index 373b21babc..1fc75efa5f 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
@@ -40,6 +40,7 @@ import org.apache.nifi.controller.status.ConnectionStatus;
 import org.apache.nifi.controller.status.LoadBalanceStatus;
 import org.apache.nifi.controller.status.PortStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessingPerformanceStatus;
 import org.apache.nifi.controller.status.ProcessorStatus;
 import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
 import org.apache.nifi.controller.status.RunStatus;
@@ -162,6 +163,8 @@ public abstract class AbstractEventAccess implements 
EventAccess {
         int flowFilesTransferred = 0;
         long bytesTransferred = 0;
         long processingNanos = 0;
+        final ProcessingPerformanceStatus performanceStatus = new 
ProcessingPerformanceStatus();
+        performanceStatus.setIdentifier(group.getIdentifier());
 
         final boolean populateChildStatuses = currentDepth <= 
recursiveStatusDepth;
 
@@ -192,6 +195,16 @@ public abstract class AbstractEventAccess implements 
EventAccess {
             bytesSent += procStat.getBytesSent();
 
             processingNanos += procStat.getProcessingNanos();
+
+            final ProcessingPerformanceStatus processorPerformanceStatus = 
procStat.getProcessingPerformanceStatus();
+
+            if (processorPerformanceStatus != null) {
+                
performanceStatus.setCpuDuration(performanceStatus.getCpuDuration() + 
processorPerformanceStatus.getCpuDuration());
+                
performanceStatus.setContentReadDuration(performanceStatus.getContentReadDuration()
 + processorPerformanceStatus.getContentReadDuration());
+                
performanceStatus.setContentWriteDuration(performanceStatus.getContentWriteDuration()
 + processorPerformanceStatus.getContentWriteDuration());
+                
performanceStatus.setSessionCommitDuration(performanceStatus.getSessionCommitDuration()
 + processorPerformanceStatus.getSessionCommitDuration());
+                
performanceStatus.setGarbageCollectionDuration(performanceStatus.getGarbageCollectionDuration()
 + processorPerformanceStatus.getGarbageCollectionDuration());
+            }
         }
 
         // set status for local child groups
@@ -226,6 +239,16 @@ public abstract class AbstractEventAccess implements 
EventAccess {
             bytesTransferred += childGroupStatus.getBytesTransferred();
 
             processingNanos += childGroupStatus.getProcessingNanos();
+
+            final ProcessingPerformanceStatus childGroupPerformanceStatus = 
childGroupStatus.getProcessingPerformanceStatus();
+
+            if (childGroupPerformanceStatus != null) {
+                
performanceStatus.setCpuDuration(performanceStatus.getCpuDuration() + 
childGroupPerformanceStatus.getCpuDuration());
+                
performanceStatus.setContentReadDuration(performanceStatus.getContentReadDuration()
 + childGroupPerformanceStatus.getContentReadDuration());
+                
performanceStatus.setContentWriteDuration(performanceStatus.getContentWriteDuration()
 + childGroupPerformanceStatus.getContentWriteDuration());
+                
performanceStatus.setSessionCommitDuration(performanceStatus.getSessionCommitDuration()
 + childGroupPerformanceStatus.getSessionCommitDuration());
+                
performanceStatus.setGarbageCollectionDuration(performanceStatus.getGarbageCollectionDuration()
 + childGroupPerformanceStatus.getGarbageCollectionDuration());
+            }
         }
 
         // set status for remote child groups
@@ -513,6 +536,7 @@ public abstract class AbstractEventAccess implements 
EventAccess {
         status.setFlowFilesTransferred(flowFilesTransferred);
         status.setBytesTransferred(bytesTransferred);
         status.setProcessingNanos(processingNanos);
+        status.setProcessingPerformanceStatus(performanceStatus);
 
         final VersionControlInformation vci = 
group.getVersionControlInformation();
         if (vci != null) {
@@ -660,6 +684,9 @@ public abstract class AbstractEventAccess implements 
EventAccess {
             if (isProcessorAuthorized) {
                 status.setCounters(flowFileEvent.getCounters());
             }
+
+            final ProcessingPerformanceStatus performanceStatus = 
PerformanceMetricsUtil.getPerformanceMetrics(flowFileEvent, procNode);
+            status.setProcessingPerformanceStatus(performanceStatus);
         }
 
         // Determine the run status and get any validation error... only 
validating while STOPPED
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/PerformanceMetricsUtil.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/PerformanceMetricsUtil.java
new file mode 100644
index 0000000000..78731d93b0
--- /dev/null
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/PerformanceMetricsUtil.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting;
+
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.repository.FlowFileEvent;
+import org.apache.nifi.controller.status.ProcessingPerformanceStatus;
+
+public class PerformanceMetricsUtil {
+
+    public static ProcessingPerformanceStatus getPerformanceMetrics(final 
FlowFileEvent fileEvent, final ProcessorNode processorNode) {
+        final ProcessingPerformanceStatus newMetrics = new 
ProcessingPerformanceStatus();
+
+        
newMetrics.setIdentifier(processorNode.getProcessGroup().getIdentifier());
+        newMetrics.setCpuDuration(fileEvent.getCpuNanoseconds());
+        
newMetrics.setContentReadDuration(fileEvent.getContentReadNanoseconds());
+        
newMetrics.setContentWriteDuration(fileEvent.getContentWriteNanoseconds());
+        
newMetrics.setSessionCommitDuration(fileEvent.getSessionCommitNanoseconds());
+        
newMetrics.setGarbageCollectionDuration(fileEvent.getGargeCollectionMillis());
+
+        return newMetrics;
+    }
+}
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 647c38d449..27daac467f 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -107,6 +107,7 @@ import org.apache.nifi.controller.state.SortedStateUtils;
 import org.apache.nifi.controller.status.ConnectionStatus;
 import org.apache.nifi.controller.status.PortStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessingPerformanceStatus;
 import org.apache.nifi.controller.status.ProcessorStatus;
 import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
 import org.apache.nifi.controller.status.analytics.ConnectionStatusPredictions;
@@ -222,6 +223,7 @@ import org.apache.nifi.web.api.dto.status.PortStatusDTO;
 import org.apache.nifi.web.api.dto.status.PortStatusSnapshotDTO;
 import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
 import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.ProcessingPerformanceStatusDTO;
 import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
 import org.apache.nifi.web.api.dto.status.ProcessorStatusSnapshotDTO;
 import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
@@ -1051,6 +1053,11 @@ public final class DtoFactory {
        
snapshot.setActiveThreadCount(processGroupStatus.getActiveThreadCount());
        
snapshot.setTerminatedThreadCount(processGroupStatus.getTerminatedThreadCount());
 
+       final ProcessingPerformanceStatus performanceStatus = 
processGroupStatus.getProcessingPerformanceStatus();
+       if (performanceStatus != null) {
+           
snapshot.setProcessingPerformanceStatus(createProcessingPerformanceStatusDTO(performanceStatus));
+       }
+
        StatusMerger.updatePrettyPrintedFields(snapshot);
        return processGroupStatusDto;
    }
@@ -1268,6 +1275,11 @@ public final class DtoFactory {
        
snapshot.setTerminatedThreadCount(procStatus.getTerminatedThreadCount());
        snapshot.setType(procStatus.getType());
 
+       final ProcessingPerformanceStatus performanceStatus = 
procStatus.getProcessingPerformanceStatus();
+       if (performanceStatus != null) {
+           
snapshot.setProcessingPerformanceStatus(createProcessingPerformanceStatusDTO(procStatus.getProcessingPerformanceStatus()));
+       }
+
        StatusMerger.updatePrettyPrintedFields(snapshot);
        return dto;
    }
@@ -5102,4 +5114,18 @@ public final class DtoFactory {
     public void setExtensionManager(ExtensionManager extensionManager) {
         this.extensionManager = extensionManager;
     }
+
+    private ProcessingPerformanceStatusDTO 
createProcessingPerformanceStatusDTO(final ProcessingPerformanceStatus 
performanceStatus) {
+
+       final ProcessingPerformanceStatusDTO performanceStatusDTO = new 
ProcessingPerformanceStatusDTO();
+
+        performanceStatusDTO.setIdentifier(performanceStatus.getIdentifier());
+        
performanceStatusDTO.setCpuDuration(performanceStatus.getCpuDuration());
+        
performanceStatusDTO.setContentReadDuration(performanceStatusDTO.getContentReadDuration());
+        
performanceStatusDTO.setContentWriteDuration(performanceStatus.getContentWriteDuration());
+        
performanceStatusDTO.setSessionCommitDuration(performanceStatus.getSessionCommitDuration());
+        
performanceStatusDTO.setGarbageCollectionDuration(performanceStatus.getGarbageCollectionDuration());
+
+        return performanceStatusDTO;
+    }
 }


Reply via email to