This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new c1403db4f0 NIFI-14586, NIFI-14587: Expose Processors' Performance
Metrics in the UI as part of the Status History; in doing so, I discovered a
bug in which the GC time was not being tracked properly and fixed it. Sorted
counter values lexicographically.
c1403db4f0 is described below
commit c1403db4f04a47b601073049a1efbddac91274e9
Author: Mark Payne <[email protected]>
AuthorDate: Tue May 20 15:07:45 2025 -0400
NIFI-14586, NIFI-14587: Expose Processors' Performance Metrics in the UI as
part of the Status History; in doing so, I discovered a bug in which the GC
time was not being tracked properly and fixed it. Sorted counter values
lexicographically.
Signed-off-by: Pierre Villard <[email protected]>
This closes #9963.
---
.../repository/metrics/EventSumValue.java | 2 +
.../apache/nifi/reporting/AbstractEventAccess.java | 41 +++--
.../nifi/reporting/PerformanceMetricsUtil.java | 37 -----
.../status/history/StatusHistoryUtil.java | 11 +-
.../status/history/ProcessorStatusDescriptor.java | 170 +++++++++++++++++++++
5 files changed, 206 insertions(+), 55 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java
index 316623d118..62ca00eacf 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java
@@ -150,6 +150,7 @@ public class EventSumValue {
this.contentReadNanos += other.contentReadNanos;
this.contentWriteNanos += other.contentWriteNanos;
this.sessionCommitNanos += other.sessionCommitNanos;
+ this.gcMillis += other.gcMillis;
final Map<String, Long> eventCounters = other.counters;
if (eventCounters != null) {
@@ -192,6 +193,7 @@ public class EventSumValue {
this.contentReadNanos -= other.contentReadNanos;
this.contentWriteNanos -= other.contentWriteNanos;
this.sessionCommitNanos -= other.sessionCommitNanos;
+ this.gcMillis -= other.gcMillis;
final Map<String, Long> eventCounters = other.counters;
if (eventCounters != null) {
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
index 1fc75efa5f..2345472f76 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
@@ -685,25 +685,15 @@ public abstract class AbstractEventAccess implements
EventAccess {
status.setCounters(flowFileEvent.getCounters());
}
- final ProcessingPerformanceStatus performanceStatus =
PerformanceMetricsUtil.getPerformanceMetrics(flowFileEvent, procNode);
- status.setProcessingPerformanceStatus(performanceStatus);
+ final ProcessingPerformanceStatus perfStatus =
createProcessingPerformanceStatus(flowFileEvent, procNode);
+ status.setProcessingPerformanceStatus(perfStatus);
}
// Determine the run status and get any validation error... only
validating while STOPPED
// is a trade-off we are willing to make, even though processor
validity could change due to
// environmental conditions (property configured with a file path and
the file being externally
// removed). This saves on validation costs that would be unnecessary
most of the time.
- if (ScheduledState.DISABLED.equals(procNode.getScheduledState())) {
- status.setRunStatus(RunStatus.Disabled);
- } else if
(ScheduledState.RUNNING.equals(procNode.getScheduledState())) {
- status.setRunStatus(RunStatus.Running);
- } else if (procNode.getValidationStatus() ==
ValidationStatus.VALIDATING) {
- status.setRunStatus(RunStatus.Validating);
- } else if (procNode.getValidationStatus() == ValidationStatus.INVALID
&& procNode.getActiveThreadCount() == 0) {
- status.setRunStatus(RunStatus.Invalid);
- } else {
- status.setRunStatus(RunStatus.Stopped);
- }
+ status.setRunStatus(determineRunStatus(procNode));
status.setExecutionNode(procNode.getExecutionNode());
status.setTerminatedThreadCount(procNode.getTerminatedThreadCount());
@@ -712,6 +702,31 @@ public abstract class AbstractEventAccess implements
EventAccess {
return status;
}
+ private ProcessingPerformanceStatus
createProcessingPerformanceStatus(final FlowFileEvent flowFileEvent, final
ProcessorNode procNode) {
+ final ProcessingPerformanceStatus perfStatus = new
ProcessingPerformanceStatus();
+ perfStatus.setIdentifier(procNode.getIdentifier());
+ perfStatus.setCpuDuration(flowFileEvent.getCpuNanoseconds());
+
perfStatus.setContentReadDuration(flowFileEvent.getContentReadNanoseconds());
+
perfStatus.setContentWriteDuration(flowFileEvent.getContentWriteNanoseconds());
+
perfStatus.setSessionCommitDuration(flowFileEvent.getSessionCommitNanoseconds());
+
perfStatus.setGarbageCollectionDuration(flowFileEvent.getGargeCollectionMillis());
+ return perfStatus;
+ }
+
+ private RunStatus determineRunStatus(final ProcessorNode procNode) {
+ if (ScheduledState.DISABLED.equals(procNode.getScheduledState())) {
+ return RunStatus.Disabled;
+ } else if
(ScheduledState.RUNNING.equals(procNode.getScheduledState())) {
+ return RunStatus.Running;
+ } else if (procNode.getValidationStatus() ==
ValidationStatus.VALIDATING) {
+ return RunStatus.Validating;
+ } else if (procNode.getValidationStatus() == ValidationStatus.INVALID
&& procNode.getActiveThreadCount() == 0) {
+ return RunStatus.Invalid;
+ }
+
+ return RunStatus.Stopped;
+ }
+
/**
* Returns the status of all components in the controller. This request is
* not in the context of a user so the results will be unfiltered.
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/PerformanceMetricsUtil.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/PerformanceMetricsUtil.java
deleted file mode 100644
index 78731d93b0..0000000000
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/PerformanceMetricsUtil.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.reporting;
-
-import org.apache.nifi.controller.ProcessorNode;
-import org.apache.nifi.controller.repository.FlowFileEvent;
-import org.apache.nifi.controller.status.ProcessingPerformanceStatus;
-
-public class PerformanceMetricsUtil {
-
- public static ProcessingPerformanceStatus getPerformanceMetrics(final
FlowFileEvent fileEvent, final ProcessorNode processorNode) {
- final ProcessingPerformanceStatus newMetrics = new
ProcessingPerformanceStatus();
-
-
newMetrics.setIdentifier(processorNode.getProcessGroup().getIdentifier());
- newMetrics.setCpuDuration(fileEvent.getCpuNanoseconds());
-
newMetrics.setContentReadDuration(fileEvent.getContentReadNanoseconds());
-
newMetrics.setContentWriteDuration(fileEvent.getContentWriteNanoseconds());
-
newMetrics.setSessionCommitDuration(fileEvent.getSessionCommitNanoseconds());
-
newMetrics.setGarbageCollectionDuration(fileEvent.getGargeCollectionMillis());
-
- return newMetrics;
- }
-}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java
index f15733e344..4091b7a746 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java
@@ -23,16 +23,16 @@ import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
-import java.util.stream.Collectors;
public class StatusHistoryUtil {
@@ -79,7 +79,7 @@ public class StatusHistoryUtil {
public static List<StatusDescriptorDTO> createFieldDescriptorDtos(final
Collection<MetricDescriptor<?>> metricDescriptors) {
final StatusDescriptorDTO[] standardMetricDescriptors = new
StatusDescriptorDTO[metricDescriptors.size()];
- final List<StatusDescriptorDTO> counterMetricDescriptors = new
LinkedList<>();
+ final List<StatusDescriptorDTO> counterMetricDescriptors = new
ArrayList<>();
for (final MetricDescriptor<?> metricDescriptor : metricDescriptors) {
if (metricDescriptor instanceof StandardMetricDescriptor) {
@@ -91,9 +91,10 @@ public class StatusHistoryUtil {
}
}
- // Ordered standard metric descriptors are added first than counter
metric descriptors in the order of appearance.
+ // Ordered standard metric descriptors are added first, then counter
metric descriptors in lexicographical order of their label.
+
counterMetricDescriptors.sort(Comparator.comparing(StatusDescriptorDTO::getLabel));
final List<StatusDescriptorDTO> result = new
ArrayList<>(metricDescriptors.size());
-
result.addAll(Arrays.asList(standardMetricDescriptors).stream().filter(i -> i
!= null).collect(Collectors.toList()));
+
result.addAll(Arrays.stream(standardMetricDescriptors).filter(Objects::nonNull).toList());
result.addAll(counterMetricDescriptors);
return result;
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-status-history-shared/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-status-history-shared/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java
index feff921299..e701b2be2f 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-status-history-shared/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-status-history-shared/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java
@@ -17,11 +17,13 @@
package org.apache.nifi.controller.status.history;
+import org.apache.nifi.controller.status.ProcessingPerformanceStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
public enum ProcessorStatusDescriptor {
BYTES_READ(
@@ -164,8 +166,176 @@ public enum ProcessorStatusDescriptor {
}
},
true
+ ),
+
+ CPU_MILLIS(
+ "cpuTime",
+ "CPU Time (5 mins)",
+ "The total amount of time that the Processor has used the CPU in the
past 5 minutes. " +
+ "Note, this metric may be unavailable, depending on configuration of
the `nifi.performance.tracking.percentage` property.",
+ Formatter.DURATION,
+ status -> nanosToMillis(status,
ProcessingPerformanceStatus::getCpuDuration)
+ ),
+
+ CPU_PERCENTAGE(
+ "cpuPercentage",
+ "CPU Percentage (5 mins)",
+ "Of the time that the Processor was running in the past 5 minutes, the
percentage of that time that the Processor was using the CPU. " +
+ "Note, this metric may be unavailable, depending on configuration of
the `nifi.performance.tracking.percentage` property.",
+ Formatter.COUNT,
+ status -> nanosValue(status,
ProcessingPerformanceStatus::getCpuDuration) * 100 / Math.max(1,
status.getProcessingNanos()),
+ processingPercentage(CPU_MILLIS.getDescriptor()),
+ true
+ ),
+
+ CONTENT_REPO_READ_MILLIS(
+ "contentRepoReadTime",
+ "Content Repo Read Time (5 mins)",
+ "The amount of time that the Processor has spent reading from the
Content Repository in the past 5 minutes. " +
+ "Note, this metric may be unavailable, depending on configuration of
the `nifi.performance.tracking.percentage` property.",
+ Formatter.DURATION,
+ status -> nanosToMillis(status,
ProcessingPerformanceStatus::getContentReadDuration)
+ ),
+
+ CONTENT_REPO_READ_PERCENTAGE(
+ "contentRepoReadPercentage",
+ "Content Repo Read Percentage (5 mins)",
+ "Of the time that the Processor was running in the past 5 minutes, the
percentage of that time that the Processor was reading from the Content
Repository. " +
+ "Note, this metric may be unavailable, depending on configuration of
the `nifi.performance.tracking.percentage` property.",
+ Formatter.COUNT,
+ status -> nanosValue(status,
ProcessingPerformanceStatus::getContentReadDuration) * 100 / Math.max(1,
status.getProcessingNanos()),
+ processingPercentage(CONTENT_REPO_READ_MILLIS.getDescriptor()),
+ true
+ ),
+
+ CONTENT_REPO_WRITE_MILLIS(
+ "contentRepoWriteTime",
+ "Content Repo Write Time (5 mins)",
+ "The total amount of time that the Processor has spent writing to the
Content Repository in the past 5 minutes. " +
+ "Note, this metric may be unavailable, depending on configuration of
the `nifi.performance.tracking.percentage` property.",
+ Formatter.DURATION,
+ status -> nanosToMillis(status,
ProcessingPerformanceStatus::getContentWriteDuration)
+ ),
+
+ CONTENT_REPO_WRITE_PERCENTAGE(
+ "contentRepoWritePercentage",
+ "Content Repo Write Percentage (5 mins)",
+ "Of the time that the Processor was running in the past 5 minutes, the
percentage of that time that the Processor was writing to the Content
Repository. " +
+ "Note, this metric may be unavailable, depending on configuration of
the `nifi.performance.tracking.percentage` property.",
+ Formatter.COUNT,
+ status -> nanosValue(status,
ProcessingPerformanceStatus::getContentWriteDuration) * 100 / Math.max(1,
status.getProcessingNanos()),
+ processingPercentage(CONTENT_REPO_WRITE_MILLIS.getDescriptor()),
+ true
+ ),
+
+ SESSION_COMMIT_MILLIS(
+ "sessionCommitTime",
+ "Session Commit Time (5 mins)",
+ "The total amount of time that the Processor has spent waiting for the
framework to commit its ProcessSession in the past 5 minutes. " +
+ "Note, this metric may be unavailable, depending on configuration of
the `nifi.performance.tracking.percentage` property.",
+ Formatter.DURATION,
+ status -> nanosToMillis(status,
ProcessingPerformanceStatus::getSessionCommitDuration)
+ ),
+
+ SESSION_COMMIT_PERCENTAGE(
+ "sessionCommitPercentage",
+ "Session Commit Percentage (5 mins)",
+ "Of the time that the Processor was running in the past 5 minutes, the
percentage of that time that the Processor was waiting for hte framework to
commit its ProcessSession. " +
+ "Note, this metric may be unavailable, depending on configuration of
the `nifi.performance.tracking.percentage` property.",
+ Formatter.COUNT,
+ status -> nanosValue(status,
ProcessingPerformanceStatus::getSessionCommitDuration) * 100 / Math.max(1,
status.getProcessingNanos()),
+ processingPercentage(SESSION_COMMIT_MILLIS.getDescriptor()),
+ true
+ ),
+
+ GARBAGE_COLLECTION_MILLIS(
+ "garbageCollectionTime",
+ "Garbage Collection Time (5 mins)",
+ "The total amount of time that the Processor has spent blocked on
Garbage Collection in the past 5 minutes. " +
+ "Note, this metric may be unavailable, depending on configuration of
the `nifi.performance.tracking.percentage` property.",
+ Formatter.DURATION,
+ status -> {
+ final ProcessingPerformanceStatus perfStatus =
status.getProcessingPerformanceStatus();
+ if (perfStatus == null) {
+ return 0L;
+ }
+ // Garbage Collection is reported in milliseconds rather than
nanos.
+ return perfStatus.getGarbageCollectionDuration();
+ }
+ ),
+
+ GARBAGE_COLLECTION_PERCENTAGE(
+ "garbageCollectionPercentage",
+ "Garbage Collection Percentage (5 mins)",
+ "Of the time that the Processor was running in the past 5 minutes, the
percentage of that time that the Processor spent blocked on Garbage Collection.
" +
+ "Note, this metric may be unavailable, depending on configuration of
the `nifi.performance.tracking.percentage` property.",
+ Formatter.COUNT,
+ status -> {
+ final ProcessingPerformanceStatus perfStatus =
status.getProcessingPerformanceStatus();
+ if (perfStatus == null) {
+ return 0L;
+ }
+ // Garbage Collection is reported in milliseconds rather than
nanos.
+ final long percentage = perfStatus.getGarbageCollectionDuration()
* 100 / Math.max(1, status.getProcessingNanos());
+ if (percentage == 0 && perfStatus.getGarbageCollectionDuration() >
0) {
+ // If the value is non-zero but less than 1%, we want to
return 1%.
+ return 1L;
+ }
+ return percentage;
+ },
+ processingPercentage(GARBAGE_COLLECTION_MILLIS.getDescriptor()),
+ true
);
+ private static long nanosToMillis(final ProcessorStatus procStatus, final
Function<ProcessingPerformanceStatus, Long> metricTransform) {
+ final ProcessingPerformanceStatus perfStatus =
procStatus.getProcessingPerformanceStatus();
+ if (perfStatus == null) {
+ return 0;
+ }
+
+ final long nanos = metricTransform.apply(perfStatus);
+ final long millis = TimeUnit.MILLISECONDS.convert(nanos,
TimeUnit.NANOSECONDS);
+ if (millis == 0 && nanos > 0) {
+ // If the value is non-zero but less than 1ms, we want to return
1ms.
+ return 1;
+ }
+
+ return millis;
+ }
+
+ private static ValueReducer<StatusSnapshot, Long>
processingPercentage(final MetricDescriptor<?> metricDescriptor) {
+ return new ValueReducer<>() {
+ @Override
+ public Long reduce(final List<StatusSnapshot> values) {
+ long procNanos = 0L;
+ long metricMillis = 0L;
+
+ for (final StatusSnapshot snapshot : values) {
+ final long millis =
snapshot.getStatusMetric(metricDescriptor);
+ metricMillis += millis;
+
+ final long taskNanos =
snapshot.getStatusMetric(ProcessorStatusDescriptor.TASK_NANOS.getDescriptor());
+ procNanos += taskNanos;
+ }
+
+ if (procNanos == 0) {
+ return 0L;
+ }
+
+ final long procMillis =
TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
+ return metricMillis * 100 / procMillis;
+ }
+ };
+ }
+
+ private static long nanosValue(final ProcessorStatus procStatus, final
Function<ProcessingPerformanceStatus, Long> metricTransform) {
+ final ProcessingPerformanceStatus perfStatus =
procStatus.getProcessingPerformanceStatus();
+ if (perfStatus == null) {
+ return 0;
+ }
+
+ return metricTransform.apply(perfStatus);
+ }
private final MetricDescriptor<ProcessorStatus> descriptor;