This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new c1403db4f0 NIFI-14586, NIFI-14587: Expose Processors' Performance 
Metrics in the UI as part of the Status History; in doing so, I discovered a 
bug in which the GC time was not being tracked properly and fixed it. Sorted 
counter values lexicographically.
c1403db4f0 is described below

commit c1403db4f04a47b601073049a1efbddac91274e9
Author: Mark Payne <[email protected]>
AuthorDate: Tue May 20 15:07:45 2025 -0400

    NIFI-14586, NIFI-14587: Expose Processors' Performance Metrics in the UI as 
part of the Status History; in doing so, I discovered a bug in which the GC 
time was not being tracked properly and fixed it. Sorted counter values 
lexicographically.
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #9963.
---
 .../repository/metrics/EventSumValue.java          |   2 +
 .../apache/nifi/reporting/AbstractEventAccess.java |  41 +++--
 .../nifi/reporting/PerformanceMetricsUtil.java     |  37 -----
 .../status/history/StatusHistoryUtil.java          |  11 +-
 .../status/history/ProcessorStatusDescriptor.java  | 170 +++++++++++++++++++++
 5 files changed, 206 insertions(+), 55 deletions(-)

diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java
index 316623d118..62ca00eacf 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java
@@ -150,6 +150,7 @@ public class EventSumValue {
             this.contentReadNanos += other.contentReadNanos;
             this.contentWriteNanos += other.contentWriteNanos;
             this.sessionCommitNanos += other.sessionCommitNanos;
+            this.gcMillis += other.gcMillis;
 
             final Map<String, Long> eventCounters = other.counters;
             if (eventCounters != null) {
@@ -192,6 +193,7 @@ public class EventSumValue {
             this.contentReadNanos -= other.contentReadNanos;
             this.contentWriteNanos -= other.contentWriteNanos;
             this.sessionCommitNanos -= other.sessionCommitNanos;
+            this.gcMillis -= other.gcMillis;
 
             final Map<String, Long> eventCounters = other.counters;
             if (eventCounters != null) {
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
index 1fc75efa5f..2345472f76 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
@@ -685,25 +685,15 @@ public abstract class AbstractEventAccess implements 
EventAccess {
                 status.setCounters(flowFileEvent.getCounters());
             }
 
-            final ProcessingPerformanceStatus performanceStatus = 
PerformanceMetricsUtil.getPerformanceMetrics(flowFileEvent, procNode);
-            status.setProcessingPerformanceStatus(performanceStatus);
+            final ProcessingPerformanceStatus perfStatus = 
createProcessingPerformanceStatus(flowFileEvent, procNode);
+            status.setProcessingPerformanceStatus(perfStatus);
         }
 
         // Determine the run status and get any validation error... only 
validating while STOPPED
         // is a trade-off we are willing to make, even though processor 
validity could change due to
         // environmental conditions (property configured with a file path and 
the file being externally
         // removed). This saves on validation costs that would be unnecessary 
most of the time.
-        if (ScheduledState.DISABLED.equals(procNode.getScheduledState())) {
-            status.setRunStatus(RunStatus.Disabled);
-        } else if 
(ScheduledState.RUNNING.equals(procNode.getScheduledState())) {
-            status.setRunStatus(RunStatus.Running);
-        } else if (procNode.getValidationStatus() == 
ValidationStatus.VALIDATING) {
-            status.setRunStatus(RunStatus.Validating);
-        } else if (procNode.getValidationStatus() == ValidationStatus.INVALID 
&& procNode.getActiveThreadCount() == 0) {
-            status.setRunStatus(RunStatus.Invalid);
-        } else {
-            status.setRunStatus(RunStatus.Stopped);
-        }
+        status.setRunStatus(determineRunStatus(procNode));
 
         status.setExecutionNode(procNode.getExecutionNode());
         status.setTerminatedThreadCount(procNode.getTerminatedThreadCount());
@@ -712,6 +702,31 @@ public abstract class AbstractEventAccess implements 
EventAccess {
         return status;
     }
 
+    private ProcessingPerformanceStatus 
createProcessingPerformanceStatus(final FlowFileEvent flowFileEvent, final 
ProcessorNode procNode) {
+        final ProcessingPerformanceStatus perfStatus = new 
ProcessingPerformanceStatus();
+        perfStatus.setIdentifier(procNode.getIdentifier());
+        perfStatus.setCpuDuration(flowFileEvent.getCpuNanoseconds());
+        
perfStatus.setContentReadDuration(flowFileEvent.getContentReadNanoseconds());
+        
perfStatus.setContentWriteDuration(flowFileEvent.getContentWriteNanoseconds());
+        
perfStatus.setSessionCommitDuration(flowFileEvent.getSessionCommitNanoseconds());
+        
perfStatus.setGarbageCollectionDuration(flowFileEvent.getGargeCollectionMillis());
+        return perfStatus;
+    }
+
+    private RunStatus determineRunStatus(final ProcessorNode procNode) {
+        if (ScheduledState.DISABLED.equals(procNode.getScheduledState())) {
+            return RunStatus.Disabled;
+        } else if 
(ScheduledState.RUNNING.equals(procNode.getScheduledState())) {
+            return RunStatus.Running;
+        } else if (procNode.getValidationStatus() == 
ValidationStatus.VALIDATING) {
+            return RunStatus.Validating;
+        } else if (procNode.getValidationStatus() == ValidationStatus.INVALID 
&& procNode.getActiveThreadCount() == 0) {
+            return RunStatus.Invalid;
+        }
+
+        return RunStatus.Stopped;
+    }
+
     /**
      * Returns the status of all components in the controller. This request is
      * not in the context of a user so the results will be unfiltered.
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/PerformanceMetricsUtil.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/PerformanceMetricsUtil.java
deleted file mode 100644
index 78731d93b0..0000000000
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/PerformanceMetricsUtil.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.reporting;
-
-import org.apache.nifi.controller.ProcessorNode;
-import org.apache.nifi.controller.repository.FlowFileEvent;
-import org.apache.nifi.controller.status.ProcessingPerformanceStatus;
-
-public class PerformanceMetricsUtil {
-
-    public static ProcessingPerformanceStatus getPerformanceMetrics(final 
FlowFileEvent fileEvent, final ProcessorNode processorNode) {
-        final ProcessingPerformanceStatus newMetrics = new 
ProcessingPerformanceStatus();
-
-        
newMetrics.setIdentifier(processorNode.getProcessGroup().getIdentifier());
-        newMetrics.setCpuDuration(fileEvent.getCpuNanoseconds());
-        
newMetrics.setContentReadDuration(fileEvent.getContentReadNanoseconds());
-        
newMetrics.setContentWriteDuration(fileEvent.getContentWriteNanoseconds());
-        
newMetrics.setSessionCommitDuration(fileEvent.getSessionCommitNanoseconds());
-        
newMetrics.setGarbageCollectionDuration(fileEvent.getGargeCollectionMillis());
-
-        return newMetrics;
-    }
-}
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java
index f15733e344..4091b7a746 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java
@@ -23,16 +23,16 @@ import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 public class StatusHistoryUtil {
 
@@ -79,7 +79,7 @@ public class StatusHistoryUtil {
 
     public static List<StatusDescriptorDTO> createFieldDescriptorDtos(final 
Collection<MetricDescriptor<?>> metricDescriptors) {
         final StatusDescriptorDTO[] standardMetricDescriptors = new 
StatusDescriptorDTO[metricDescriptors.size()];
-        final List<StatusDescriptorDTO> counterMetricDescriptors = new 
LinkedList<>();
+        final List<StatusDescriptorDTO> counterMetricDescriptors = new 
ArrayList<>();
 
         for (final MetricDescriptor<?> metricDescriptor : metricDescriptors) {
             if (metricDescriptor instanceof StandardMetricDescriptor) {
@@ -91,9 +91,10 @@ public class StatusHistoryUtil {
             }
         }
 
-        // Ordered standard metric descriptors are added first than counter 
metric descriptors in the order of appearance.
+        // Ordered standard metric descriptors are added first, then counter 
metric descriptors in lexicographical order of their label.
+        
counterMetricDescriptors.sort(Comparator.comparing(StatusDescriptorDTO::getLabel));
         final List<StatusDescriptorDTO> result = new 
ArrayList<>(metricDescriptors.size());
-        
result.addAll(Arrays.asList(standardMetricDescriptors).stream().filter(i -> i 
!= null).collect(Collectors.toList()));
+        
result.addAll(Arrays.stream(standardMetricDescriptors).filter(Objects::nonNull).toList());
         result.addAll(counterMetricDescriptors);
         return result;
     }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-status-history-shared/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-status-history-shared/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java
index feff921299..e701b2be2f 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-status-history-shared/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-status-history-shared/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java
@@ -17,11 +17,13 @@
 
 package org.apache.nifi.controller.status.history;
 
+import org.apache.nifi.controller.status.ProcessingPerformanceStatus;
 import org.apache.nifi.controller.status.ProcessorStatus;
 import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
 
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 
 public enum ProcessorStatusDescriptor {
     BYTES_READ(
@@ -164,8 +166,176 @@ public enum ProcessorStatusDescriptor {
                 }
             },
         true
+    ),
+
+    CPU_MILLIS(
+        "cpuTime",
+        "CPU Time (5 mins)",
+        "The total amount of time that the Processor has used the CPU in the 
past 5 minutes. " +
+        "Note, this metric may be unavailable, depending on configuration of 
the `nifi.performance.tracking.percentage` property.",
+        Formatter.DURATION,
+        status -> nanosToMillis(status, 
ProcessingPerformanceStatus::getCpuDuration)
+    ),
+
+    CPU_PERCENTAGE(
+        "cpuPercentage",
+        "CPU Percentage (5 mins)",
+        "Of the time that the Processor was running in the past 5 minutes, the 
percentage of that time that the Processor was using the CPU. " +
+        "Note, this metric may be unavailable, depending on configuration of 
the `nifi.performance.tracking.percentage` property.",
+        Formatter.COUNT,
+        status -> nanosValue(status, 
ProcessingPerformanceStatus::getCpuDuration) * 100 / Math.max(1, 
status.getProcessingNanos()),
+        processingPercentage(CPU_MILLIS.getDescriptor()),
+        true
+    ),
+
+    CONTENT_REPO_READ_MILLIS(
+        "contentRepoReadTime",
+        "Content Repo Read Time (5 mins)",
+        "The amount of time that the Processor has spent reading from the 
Content Repository in the past 5 minutes. " +
+        "Note, this metric may be unavailable, depending on configuration of 
the `nifi.performance.tracking.percentage` property.",
+        Formatter.DURATION,
+        status -> nanosToMillis(status, 
ProcessingPerformanceStatus::getContentReadDuration)
+    ),
+
+    CONTENT_REPO_READ_PERCENTAGE(
+        "contentRepoReadPercentage",
+        "Content Repo Read Percentage (5 mins)",
+        "Of the time that the Processor was running in the past 5 minutes, the 
percentage of that time that the Processor was reading from the Content 
Repository. " +
+        "Note, this metric may be unavailable, depending on configuration of 
the `nifi.performance.tracking.percentage` property.",
+        Formatter.COUNT,
+        status -> nanosValue(status, 
ProcessingPerformanceStatus::getContentReadDuration) * 100 / Math.max(1, 
status.getProcessingNanos()),
+        processingPercentage(CONTENT_REPO_READ_MILLIS.getDescriptor()),
+        true
+    ),
+
+    CONTENT_REPO_WRITE_MILLIS(
+        "contentRepoWriteTime",
+        "Content Repo Write Time (5 mins)",
+        "The total amount of time that the Processor has spent writing to the 
Content Repository in the past 5 minutes. " +
+        "Note, this metric may be unavailable, depending on configuration of 
the `nifi.performance.tracking.percentage` property.",
+        Formatter.DURATION,
+        status -> nanosToMillis(status, 
ProcessingPerformanceStatus::getContentWriteDuration)
+    ),
+
+    CONTENT_REPO_WRITE_PERCENTAGE(
+        "contentRepoWritePercentage",
+        "Content Repo Write Percentage (5 mins)",
+        "Of the time that the Processor was running in the past 5 minutes, the 
percentage of that time that the Processor was writing to the Content 
Repository. " +
+        "Note, this metric may be unavailable, depending on configuration of 
the `nifi.performance.tracking.percentage` property.",
+        Formatter.COUNT,
+        status -> nanosValue(status, 
ProcessingPerformanceStatus::getContentWriteDuration) * 100 / Math.max(1, 
status.getProcessingNanos()),
+        processingPercentage(CONTENT_REPO_WRITE_MILLIS.getDescriptor()),
+        true
+    ),
+
+    SESSION_COMMIT_MILLIS(
+        "sessionCommitTime",
+        "Session Commit Time (5 mins)",
+        "The total amount of time that the Processor has spent waiting for the 
framework to commit its ProcessSession in the past 5 minutes. " +
+        "Note, this metric may be unavailable, depending on configuration of 
the `nifi.performance.tracking.percentage` property.",
+        Formatter.DURATION,
+        status -> nanosToMillis(status, 
ProcessingPerformanceStatus::getSessionCommitDuration)
+    ),
+
+    SESSION_COMMIT_PERCENTAGE(
+        "sessionCommitPercentage",
+        "Session Commit Percentage (5 mins)",
+        "Of the time that the Processor was running in the past 5 minutes, the 
percentage of that time that the Processor was waiting for hte framework to 
commit its ProcessSession. " +
+        "Note, this metric may be unavailable, depending on configuration of 
the `nifi.performance.tracking.percentage` property.",
+        Formatter.COUNT,
+        status -> nanosValue(status, 
ProcessingPerformanceStatus::getSessionCommitDuration) * 100 / Math.max(1, 
status.getProcessingNanos()),
+        processingPercentage(SESSION_COMMIT_MILLIS.getDescriptor()),
+        true
+    ),
+
+    GARBAGE_COLLECTION_MILLIS(
+        "garbageCollectionTime",
+        "Garbage Collection Time (5 mins)",
+        "The total amount of time that the Processor has spent blocked on 
Garbage Collection in the past 5 minutes. " +
+        "Note, this metric may be unavailable, depending on configuration of 
the `nifi.performance.tracking.percentage` property.",
+        Formatter.DURATION,
+        status -> {
+            final ProcessingPerformanceStatus perfStatus = 
status.getProcessingPerformanceStatus();
+            if (perfStatus == null) {
+                return 0L;
+            }
+            // Garbage Collection is reported in milliseconds rather than 
nanos.
+            return perfStatus.getGarbageCollectionDuration();
+        }
+    ),
+
+    GARBAGE_COLLECTION_PERCENTAGE(
+        "garbageCollectionPercentage",
+        "Garbage Collection Percentage (5 mins)",
+        "Of the time that the Processor was running in the past 5 minutes, the 
percentage of that time that the Processor spent blocked on Garbage Collection. 
" +
+        "Note, this metric may be unavailable, depending on configuration of 
the `nifi.performance.tracking.percentage` property.",
+        Formatter.COUNT,
+        status -> {
+            final ProcessingPerformanceStatus perfStatus = 
status.getProcessingPerformanceStatus();
+            if (perfStatus == null) {
+                return 0L;
+            }
+            // Garbage Collection is reported in milliseconds rather than 
nanos.
+            final long percentage = perfStatus.getGarbageCollectionDuration() 
* 100 / Math.max(1, status.getProcessingNanos());
+            if (percentage == 0 && perfStatus.getGarbageCollectionDuration() > 
0) {
+                // If the value is non-zero but less than 1%, we want to 
return 1%.
+                return 1L;
+            }
+            return percentage;
+        },
+        processingPercentage(GARBAGE_COLLECTION_MILLIS.getDescriptor()),
+        true
     );
 
+    private static long nanosToMillis(final ProcessorStatus procStatus, final 
Function<ProcessingPerformanceStatus, Long> metricTransform) {
+        final ProcessingPerformanceStatus perfStatus = 
procStatus.getProcessingPerformanceStatus();
+        if (perfStatus == null) {
+            return 0;
+        }
+
+        final long nanos = metricTransform.apply(perfStatus);
+        final long millis = TimeUnit.MILLISECONDS.convert(nanos, 
TimeUnit.NANOSECONDS);
+        if (millis == 0 && nanos > 0) {
+            // If the value is non-zero but less than 1ms, we want to return 
1ms.
+            return 1;
+        }
+
+        return millis;
+    }
+
+    private static ValueReducer<StatusSnapshot, Long> 
processingPercentage(final MetricDescriptor<?> metricDescriptor) {
+        return new ValueReducer<>() {
+            @Override
+            public Long reduce(final List<StatusSnapshot> values) {
+                long procNanos = 0L;
+                long metricMillis = 0L;
+
+                for (final StatusSnapshot snapshot : values) {
+                    final long millis = 
snapshot.getStatusMetric(metricDescriptor);
+                    metricMillis += millis;
+
+                    final long taskNanos = 
snapshot.getStatusMetric(ProcessorStatusDescriptor.TASK_NANOS.getDescriptor());
+                    procNanos += taskNanos;
+                }
+
+                if (procNanos == 0) {
+                    return 0L;
+                }
+
+                final long procMillis = 
TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
+                return metricMillis * 100 / procMillis;
+            }
+        };
+    }
+
+    private static long nanosValue(final ProcessorStatus procStatus, final 
Function<ProcessingPerformanceStatus, Long> metricTransform) {
+        final ProcessingPerformanceStatus perfStatus = 
procStatus.getProcessingPerformanceStatus();
+        if (perfStatus == null) {
+            return 0;
+        }
+
+        return metricTransform.apply(perfStatus);
+    }
 
 
     private final MetricDescriptor<ProcessorStatus> descriptor;

Reply via email to