This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new f262e74170 NIFI-13439 Added Performance Status to Group Status
responses
f262e74170 is described below
commit f262e741704d1ab8bd314772445f5cbac609d533
Author: Timea Barna <[email protected]>
AuthorDate: Thu Jun 27 07:41:59 2024 +0200
NIFI-13439 Added Performance Status to Group Status responses
- Added ProcessingPerformanceStatus to nifi-api
- Added Performance Status to Process Group and Processor Sources for Query
NiFi Reporting Task
This closes #9014
Signed-off-by: David Handermann <[email protected]>
---
.../nifi/controller/status/ProcessGroupStatus.java | 28 ++++++
.../status/ProcessingPerformanceStatus.java | 107 +++++++++++++++++++++
.../nifi/controller/status/ProcessorStatus.java | 12 +++
.../datasources/ProcessGroupStatusDataSource.java | 14 ++-
.../sql/datasources/ProcessorStatusDataSource.java | 14 ++-
.../reporting/sql/TestQueryNiFiReportingTask.java | 2 +-
.../dto/status/ProcessGroupStatusSnapshotDTO.java | 14 +++
.../dto/status/ProcessingPerformanceStatusDTO.java | 100 +++++++++++++++++++
.../api/dto/status/ProcessorStatusSnapshotDTO.java | 13 +++
.../manager/ProcessingPerformanceStatusMerger.java | 34 +++++++
.../apache/nifi/cluster/manager/StatusMerger.java | 6 ++
.../apache/nifi/reporting/AbstractEventAccess.java | 27 ++++++
.../nifi/reporting/PerformanceMetricsUtil.java | 37 +++++++
.../org/apache/nifi/web/api/dto/DtoFactory.java | 26 +++++
14 files changed, 429 insertions(+), 5 deletions(-)
diff --git
a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java
b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java
index 8709055305..670af28449 100644
---
a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java
+++
b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java
@@ -57,6 +57,8 @@ public class ProcessGroupStatus implements Cloneable {
private Collection<PortStatus> inputPortStatus = new ArrayList<>();
private Collection<PortStatus> outputPortStatus = new ArrayList<>();
+ private ProcessingPerformanceStatus processingPerformanceStatus;
+
public String getId() {
return id;
}
@@ -273,6 +275,14 @@ public class ProcessGroupStatus implements Cloneable {
this.processingNanos = processingNanos;
}
+ public ProcessingPerformanceStatus getProcessingPerformanceStatus() {
+ return processingPerformanceStatus;
+ }
+
+ public void setProcessingPerformanceStatus(ProcessingPerformanceStatus
processingPerformanceStatus) {
+ this.processingPerformanceStatus = processingPerformanceStatus;
+ }
+
@Override
public ProcessGroupStatus clone() {
final ProcessGroupStatus clonedObj = new ProcessGroupStatus();
@@ -296,6 +306,7 @@ public class ProcessGroupStatus implements Cloneable {
clonedObj.flowFilesTransferred = flowFilesTransferred;
clonedObj.bytesTransferred = bytesTransferred;
clonedObj.processingNanos = processingNanos;
+ clonedObj.processingPerformanceStatus = processingPerformanceStatus;
if (connectionStatus != null) {
final Collection<ConnectionStatus> statusList = new ArrayList<>();
@@ -418,6 +429,9 @@ public class ProcessGroupStatus implements Cloneable {
builder.append(status);
}
+ builder.append(", processingPerformanceStatus=");
+ builder.append(processingPerformanceStatus);
+
builder.append("]");
return builder.toString();
}
@@ -620,6 +634,20 @@ public class ProcessGroupStatus implements Cloneable {
}
target.setRemoteProcessGroupStatus(mergedRemoteGroupMap.values());
+
+ final ProcessingPerformanceStatus targetPerformanceStatus =
target.getProcessingPerformanceStatus();
+ final ProcessingPerformanceStatus toMergePerformanceStatus =
toMerge.getProcessingPerformanceStatus();
+
+ if (targetPerformanceStatus != null && toMergePerformanceStatus !=
null) {
+
targetPerformanceStatus.setIdentifier(toMergePerformanceStatus.getIdentifier());
+
targetPerformanceStatus.setCpuDuration(targetPerformanceStatus.getCpuDuration()
+ toMergePerformanceStatus.getCpuDuration());
+
targetPerformanceStatus.setContentReadDuration(targetPerformanceStatus.getContentReadDuration()
+ toMergePerformanceStatus.getContentReadDuration());
+
targetPerformanceStatus.setContentWriteDuration(targetPerformanceStatus.getContentWriteDuration()
+ toMergePerformanceStatus.getContentWriteDuration());
+
targetPerformanceStatus.setSessionCommitDuration(targetPerformanceStatus.getSessionCommitDuration()
+ toMergePerformanceStatus.getSessionCommitDuration());
+
targetPerformanceStatus.setGarbageCollectionDuration(targetPerformanceStatus.getGarbageCollectionDuration()
+ toMergePerformanceStatus.getGarbageCollectionDuration());
+ } else {
+ target.setProcessingPerformanceStatus(targetPerformanceStatus);
+ }
}
public static FlowFileAvailability mergeFlowFileAvailability(final
FlowFileAvailability availabilityA, final FlowFileAvailability availabilityB) {
diff --git
a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessingPerformanceStatus.java
b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessingPerformanceStatus.java
new file mode 100644
index 0000000000..e87894c343
--- /dev/null
+++
b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessingPerformanceStatus.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status;
+
+public class ProcessingPerformanceStatus implements Cloneable {
+
+ private String identifier;
+ private long cpuDuration;
+ private long contentReadDuration;
+ private long contentWriteDuration;
+ private long sessionCommitDuration;
+ private long garbageCollectionDuration;
+
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ public void setIdentifier(String identifier) {
+ this.identifier = identifier;
+ }
+
+ public long getCpuDuration() {
+ return cpuDuration;
+ }
+
+ public void setCpuDuration(long cpuDuration) {
+ this.cpuDuration = cpuDuration;
+ }
+
+ public long getContentReadDuration() {
+ return contentReadDuration;
+ }
+
+ public void setContentReadDuration(long contentReadDuration) {
+ this.contentReadDuration = contentReadDuration;
+ }
+
+ public long getContentWriteDuration() {
+ return contentWriteDuration;
+ }
+
+ public void setContentWriteDuration(long contentWriteDuration) {
+ this.contentWriteDuration = contentWriteDuration;
+ }
+
+ public long getSessionCommitDuration() {
+ return sessionCommitDuration;
+ }
+
+ public void setSessionCommitDuration(long sessionCommitDuration) {
+ this.sessionCommitDuration = sessionCommitDuration;
+ }
+
+ public long getGarbageCollectionDuration() {
+ return garbageCollectionDuration;
+ }
+
+ public void setGarbageCollectionDuration(long garbageCollectionDuration) {
+ this.garbageCollectionDuration = garbageCollectionDuration;
+ }
+
+ @Override
+ public ProcessingPerformanceStatus clone() {
+ final ProcessingPerformanceStatus clonedObj = new
ProcessingPerformanceStatus();
+
+ clonedObj.identifier = identifier;
+ clonedObj.cpuDuration = cpuDuration;
+ clonedObj.contentReadDuration = contentReadDuration;
+ clonedObj.contentWriteDuration = contentWriteDuration;
+ clonedObj.sessionCommitDuration = sessionCommitDuration;
+ clonedObj.garbageCollectionDuration = garbageCollectionDuration;
+ return clonedObj;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("ProcessorPerformanceStatus [Group ID= ");
+ builder.append(identifier);
+ builder.append(", cpuDuration= ");
+ builder.append(cpuDuration);
+ builder.append(", contentReadDuration= ");
+ builder.append(contentReadDuration);
+ builder.append(", contentWriteDuration= ");
+ builder.append(contentWriteDuration);
+ builder.append(", sessionCommitDuration= ");
+ builder.append(sessionCommitDuration);
+ builder.append(", garbageCollectionDuration= ");
+ builder.append(garbageCollectionDuration);
+ builder.append("]");
+ return builder.toString();
+ }
+}
diff --git
a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java
b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java
index ba90534239..380829d38f 100644
---
a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java
+++
b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java
@@ -48,6 +48,7 @@ public class ProcessorStatus implements Cloneable {
private int flowFilesSent;
private long bytesSent;
private Map<String, Long> counters;
+ private ProcessingPerformanceStatus processingPerformanceStatus;
public String getId() {
return id;
@@ -241,6 +242,14 @@ public class ProcessorStatus implements Cloneable {
this.counters = counters;
}
+ public ProcessingPerformanceStatus getProcessingPerformanceStatus() {
+ return processingPerformanceStatus;
+ }
+
+ public void setProcessingPerformanceStatus(ProcessingPerformanceStatus
processingPerformanceStatus) {
+ this.processingPerformanceStatus = processingPerformanceStatus;
+ }
+
@Override
public ProcessorStatus clone() {
final ProcessorStatus clonedObj = new ProcessorStatus();
@@ -267,6 +276,7 @@ public class ProcessorStatus implements Cloneable {
clonedObj.executionNode = executionNode;
clonedObj.type = type;
clonedObj.counters = counters == null ? null : new HashMap<>(counters);
+ clonedObj.processingPerformanceStatus = processingPerformanceStatus;
return clonedObj;
}
@@ -307,6 +317,8 @@ public class ProcessorStatus implements Cloneable {
builder.append(terminatedThreadCount);
builder.append(", counters=");
builder.append(counters);
+ builder.append(", processingPerformanceStatus=");
+ builder.append(processingPerformanceStatus);
builder.append("]");
return builder.toString();
}
diff --git
a/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ProcessGroupStatusDataSource.java
b/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ProcessGroupStatusDataSource.java
index 614682ff2f..9db3963d99 100644
---
a/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ProcessGroupStatusDataSource.java
+++
b/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ProcessGroupStatusDataSource.java
@@ -50,7 +50,12 @@ public class ProcessGroupStatusDataSource implements
ResettableDataSource {
new ColumnSchema("activeThreadCount", int.class, false),
new ColumnSchema("terminatedThreadCount", int.class, false),
new ColumnSchema("versionedFlowState", String.class, false),
- new ColumnSchema("processingNanos", long.class, false)
+ new ColumnSchema("processingNanos", long.class, false),
+ new ColumnSchema("cpuDuration", long.class, false),
+ new ColumnSchema("contentReadDuration", long.class, false),
+ new ColumnSchema("contentWriteDuration", long.class, false),
+ new ColumnSchema("sessionCommitDuration", long.class, false),
+ new ColumnSchema("garbageCollectionDuration", long.class, false)
));
@@ -123,7 +128,12 @@ public class ProcessGroupStatusDataSource implements
ResettableDataSource {
status.getTerminatedThreadCount(),
status.getQueuedCount(),
status.getVersionedFlowState() == null ? null :
status.getVersionedFlowState().name(),
- status.getProcessingNanos()
+ status.getProcessingNanos(),
+ status.getProcessingPerformanceStatus() == null ? -1 :
status.getProcessingPerformanceStatus().getCpuDuration(),
+ status.getProcessingPerformanceStatus() == null ? -1 :
status.getProcessingPerformanceStatus().getContentReadDuration(),
+ status.getProcessingPerformanceStatus() == null ? -1 :
status.getProcessingPerformanceStatus().getContentWriteDuration(),
+ status.getProcessingPerformanceStatus() == null ? -1 :
status.getProcessingPerformanceStatus().getSessionCommitDuration(),
+ status.getProcessingPerformanceStatus() == null ? -1 :
status.getProcessingPerformanceStatus().getGarbageCollectionDuration()
};
}
diff --git
a/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ProcessorStatusDataSource.java
b/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ProcessorStatusDataSource.java
index e3aade120a..ba19cce713 100644
---
a/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ProcessorStatusDataSource.java
+++
b/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ProcessorStatusDataSource.java
@@ -53,7 +53,12 @@ public class ProcessorStatusDataSource implements
ResettableDataSource {
new ColumnSchema("invocations", int.class, false),
new ColumnSchema("processingNanos", long.class, false),
new ColumnSchema("runStatus", String.class, false),
- new ColumnSchema("executionNode", String.class, false)
+ new ColumnSchema("executionNode", String.class, false),
+ new ColumnSchema("cpuDuration", long.class, false),
+ new ColumnSchema("contentReadDuration", long.class, false),
+ new ColumnSchema("contentWriteDuration", long.class, false),
+ new ColumnSchema("sessionCommitDuration", long.class, false),
+ new ColumnSchema("garbageCollectionDuration", long.class, false)
));
@@ -123,7 +128,12 @@ public class ProcessorStatusDataSource implements
ResettableDataSource {
status.getInvocations(),
status.getProcessingNanos(),
status.getRunStatus().name(),
- status.getExecutionNode() == null ? null :
status.getExecutionNode().name()
+ status.getExecutionNode() == null ? null :
status.getExecutionNode().name(),
+ status.getProcessingPerformanceStatus() == null ? -1 :
status.getProcessingPerformanceStatus().getCpuDuration(),
+ status.getProcessingPerformanceStatus() == null ? -1 :
status.getProcessingPerformanceStatus().getContentReadDuration(),
+ status.getProcessingPerformanceStatus() == null ? -1 :
status.getProcessingPerformanceStatus().getContentWriteDuration(),
+ status.getProcessingPerformanceStatus() == null ? -1 :
status.getProcessingPerformanceStatus().getSessionCommitDuration(),
+ status.getProcessingPerformanceStatus() == null ? -1 :
status.getProcessingPerformanceStatus().getGarbageCollectionDuration()
};
}
}
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
b/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
index ab80bea754..d79e2473a2 100644
---
a/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
+++
b/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
@@ -502,7 +502,7 @@ class TestQueryNiFiReportingTask {
assertEquals(4, rows.size());
// Validate the first row
Map<String, Object> row = rows.get(0);
- assertEquals(21, row.size());
+ assertEquals(26, row.size());
assertEquals(1L, row.get("bytesRead"));
// Validate the second row
row = rows.get(1);
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessGroupStatusSnapshotDTO.java
b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessGroupStatusSnapshotDTO.java
index 39593473c5..f6254a7647 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessGroupStatusSnapshotDTO.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessGroupStatusSnapshotDTO.java
@@ -81,6 +81,9 @@ public class ProcessGroupStatusSnapshotDTO implements
Cloneable {
private Integer terminatedThreadCount = 0;
private Long processingNanos = 0L;
+ @Schema(description = "Represents the processing performance for all the
processors in the given process group.")
+ private ProcessingPerformanceStatusDTO processingPerformanceStatus;
+
/**
* The id for the process group.
*
@@ -520,6 +523,14 @@ public class ProcessGroupStatusSnapshotDTO implements
Cloneable {
this.processingNanos = processingNanos;
}
+ public ProcessingPerformanceStatusDTO getProcessingPerformanceStatus() {
+ return processingPerformanceStatus;
+ }
+
+ public void setProcessingPerformanceStatus(ProcessingPerformanceStatusDTO
processingPerformanceStatus) {
+ this.processingPerformanceStatus = processingPerformanceStatus;
+ }
+
@Override
public ProcessGroupStatusSnapshotDTO clone() {
final ProcessGroupStatusSnapshotDTO other = new
ProcessGroupStatusSnapshotDTO();
@@ -598,6 +609,9 @@ public class ProcessGroupStatusSnapshotDTO implements
Cloneable {
other.setProcessGroupStatusSnapshots(childGroups);
}
+
+ other.setProcessingPerformanceStatus(getProcessingPerformanceStatus());
+
return other;
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessingPerformanceStatusDTO.java
b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessingPerformanceStatusDTO.java
new file mode 100644
index 0000000000..a63416c396
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessingPerformanceStatusDTO.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.dto.status;
+
+import io.swagger.v3.oas.annotations.media.Schema;
+import jakarta.xml.bind.annotation.XmlType;
+
+/**
+ * DTO for serializing the performance status of a processor.
+ */
+@XmlType(name = "processingPerformanceStatus")
+public class ProcessingPerformanceStatusDTO implements Cloneable {
+ private String identifier;
+ private long cpuDuration;
+ private long contentReadDuration;
+ private long contentWriteDuration;
+ private long sessionCommitDuration;
+ private long garbageCollectionDuration;
+
+ @Schema(description = "The unique ID of the process group that the
Processor belongs to")
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ public void setIdentifier(String identifier) {
+ this.identifier = identifier;
+ }
+
+ @Schema(description = "The number of nanoseconds has spent on CPU usage in
the last 5 minutes.")
+ public long getCpuDuration() {
+ return cpuDuration;
+ }
+
+ public void setCpuDuration(long cpuDuration) {
+ this.cpuDuration = cpuDuration;
+ }
+
+ @Schema(description = "The number of nanoseconds has spent to read content
in the last 5 minutes.")
+ public long getContentReadDuration() {
+ return contentReadDuration;
+ }
+
+ public void setContentReadDuration(long contentReadDuration) {
+ this.contentReadDuration = contentReadDuration;
+ }
+
+ @Schema(description = "The number of nanoseconds has spent to write
content in the last 5 minutes.")
+ public long getContentWriteDuration() {
+ return contentWriteDuration;
+ }
+
+ public void setContentWriteDuration(long contentWriteDuration) {
+ this.contentWriteDuration = contentWriteDuration;
+ }
+
+ @Schema(description = "The number of nanoseconds has spent running to
commit sessions the last 5 minutes.")
+ public long getSessionCommitDuration() {
+ return sessionCommitDuration;
+ }
+
+ public void setSessionCommitDuration(long sessionCommitDuration) {
+ this.sessionCommitDuration = sessionCommitDuration;
+ }
+
+ @Schema(description = "The number of nanoseconds has spent running garbage
collection in the last 5 minutes.")
+ public long getGarbageCollectionDuration() {
+ return garbageCollectionDuration;
+ }
+
+ public void setGarbageCollectionDuration(long garbageCollectionDuration) {
+ this.garbageCollectionDuration = garbageCollectionDuration;
+ }
+
+ @Override
+ public ProcessingPerformanceStatusDTO clone() {
+ final ProcessingPerformanceStatusDTO clonedObj = new
ProcessingPerformanceStatusDTO();
+
+ clonedObj.identifier = identifier;
+ clonedObj.cpuDuration = cpuDuration;
+ clonedObj.contentReadDuration = contentReadDuration;
+ clonedObj.contentWriteDuration = contentWriteDuration;
+ clonedObj.sessionCommitDuration = sessionCommitDuration;
+ clonedObj.garbageCollectionDuration = garbageCollectionDuration;
+ return clonedObj;
+ }
+}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessorStatusSnapshotDTO.java
b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessorStatusSnapshotDTO.java
index 2e9d1ac245..2eaa89488c 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessorStatusSnapshotDTO.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessorStatusSnapshotDTO.java
@@ -53,6 +53,9 @@ public class ProcessorStatusSnapshotDTO implements Cloneable {
private Integer activeThreadCount = 0;
private Integer terminatedThreadCount = 0;
+ @Schema(description = "Represents the processor's processing performance.")
+ private ProcessingPerformanceStatusDTO processingPerformanceStatus;
+
/* getters / setters */
/**
* @return The processor id
@@ -295,6 +298,14 @@ public class ProcessorStatusSnapshotDTO implements
Cloneable {
this.tasksDurationNanos = taskNanos;
}
+ public ProcessingPerformanceStatusDTO getProcessingPerformanceStatus() {
+ return processingPerformanceStatus;
+ }
+
+ public void setProcessingPerformanceStatus(ProcessingPerformanceStatusDTO
processingPerformanceStatus) {
+ this.processingPerformanceStatus = processingPerformanceStatus;
+ }
+
@Override
public ProcessorStatusSnapshotDTO clone() {
final ProcessorStatusSnapshotDTO other = new
ProcessorStatusSnapshotDTO();
@@ -322,6 +333,8 @@ public class ProcessorStatusSnapshotDTO implements
Cloneable {
other.setWritten(getWritten());
other.setTasks(getTasks());
+ other.setProcessingPerformanceStatus(getProcessingPerformanceStatus());
+
return other;
}
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessingPerformanceStatusMerger.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessingPerformanceStatusMerger.java
new file mode 100644
index 0000000000..786c9e1cdd
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessingPerformanceStatusMerger.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager;
+
+import org.apache.nifi.web.api.dto.status.ProcessingPerformanceStatusDTO;
+
+public class ProcessingPerformanceStatusMerger {
+ public static void mergeStatus(final ProcessingPerformanceStatusDTO
target, final ProcessingPerformanceStatusDTO toMerge) {
+ if (target == null || toMerge == null) {
+ return;
+ }
+
+ target.setIdentifier(toMerge.getIdentifier());
+ target.setCpuDuration(target.getCpuDuration() +
toMerge.getCpuDuration());
+ target.setContentReadDuration(target.getContentReadDuration() +
toMerge.getContentReadDuration());
+ target.setContentWriteDuration(target.getContentWriteDuration() +
toMerge.getContentWriteDuration());
+ target.setSessionCommitDuration(target.getSessionCommitDuration() +
toMerge.getSessionCommitDuration());
+
target.setGarbageCollectionDuration(target.getGarbageCollectionDuration() +
toMerge.getGarbageCollectionDuration());
+ }
+}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
index b20df287aa..221ab41600 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
@@ -177,6 +177,9 @@ public class StatusMerger {
target.setTerminatedThreadCount(target.getTerminatedThreadCount() +
toMerge.getTerminatedThreadCount());
target.setProcessingNanos(target.getProcessingNanos() +
toMerge.getProcessingNanos());
+
+
ProcessingPerformanceStatusMerger.mergeStatus(target.getProcessingPerformanceStatus(),
toMerge.getProcessingPerformanceStatus());
+
updatePrettyPrintedFields(target);
// connection status
@@ -454,6 +457,9 @@ public class StatusMerger {
target.setTasksDurationNanos(target.getTasksDurationNanos() +
toMerge.getTasksDurationNanos());
target.setActiveThreadCount(target.getActiveThreadCount() +
toMerge.getActiveThreadCount());
target.setTerminatedThreadCount(target.getTerminatedThreadCount() +
toMerge.getTerminatedThreadCount());
+
+
ProcessingPerformanceStatusMerger.mergeStatus(target.getProcessingPerformanceStatus(),
toMerge.getProcessingPerformanceStatus());
+
updatePrettyPrintedFields(target);
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
index 373b21babc..1fc75efa5f 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
@@ -40,6 +40,7 @@ import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.LoadBalanceStatus;
import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessingPerformanceStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.controller.status.RunStatus;
@@ -162,6 +163,8 @@ public abstract class AbstractEventAccess implements
EventAccess {
int flowFilesTransferred = 0;
long bytesTransferred = 0;
long processingNanos = 0;
+ final ProcessingPerformanceStatus performanceStatus = new
ProcessingPerformanceStatus();
+ performanceStatus.setIdentifier(group.getIdentifier());
final boolean populateChildStatuses = currentDepth <=
recursiveStatusDepth;
@@ -192,6 +195,16 @@ public abstract class AbstractEventAccess implements
EventAccess {
bytesSent += procStat.getBytesSent();
processingNanos += procStat.getProcessingNanos();
+
+ final ProcessingPerformanceStatus processorPerformanceStatus =
procStat.getProcessingPerformanceStatus();
+
+ if (processorPerformanceStatus != null) {
+
performanceStatus.setCpuDuration(performanceStatus.getCpuDuration() +
processorPerformanceStatus.getCpuDuration());
+
performanceStatus.setContentReadDuration(performanceStatus.getContentReadDuration()
+ processorPerformanceStatus.getContentReadDuration());
+
performanceStatus.setContentWriteDuration(performanceStatus.getContentWriteDuration()
+ processorPerformanceStatus.getContentWriteDuration());
+
performanceStatus.setSessionCommitDuration(performanceStatus.getSessionCommitDuration()
+ processorPerformanceStatus.getSessionCommitDuration());
+
performanceStatus.setGarbageCollectionDuration(performanceStatus.getGarbageCollectionDuration()
+ processorPerformanceStatus.getGarbageCollectionDuration());
+ }
}
// set status for local child groups
@@ -226,6 +239,16 @@ public abstract class AbstractEventAccess implements
EventAccess {
bytesTransferred += childGroupStatus.getBytesTransferred();
processingNanos += childGroupStatus.getProcessingNanos();
+
+ final ProcessingPerformanceStatus childGroupPerformanceStatus =
childGroupStatus.getProcessingPerformanceStatus();
+
+ if (childGroupPerformanceStatus != null) {
+
performanceStatus.setCpuDuration(performanceStatus.getCpuDuration() +
childGroupPerformanceStatus.getCpuDuration());
+
performanceStatus.setContentReadDuration(performanceStatus.getContentReadDuration()
+ childGroupPerformanceStatus.getContentReadDuration());
+
performanceStatus.setContentWriteDuration(performanceStatus.getContentWriteDuration()
+ childGroupPerformanceStatus.getContentWriteDuration());
+
performanceStatus.setSessionCommitDuration(performanceStatus.getSessionCommitDuration()
+ childGroupPerformanceStatus.getSessionCommitDuration());
+
performanceStatus.setGarbageCollectionDuration(performanceStatus.getGarbageCollectionDuration()
+ childGroupPerformanceStatus.getGarbageCollectionDuration());
+ }
}
// set status for remote child groups
@@ -513,6 +536,7 @@ public abstract class AbstractEventAccess implements
EventAccess {
status.setFlowFilesTransferred(flowFilesTransferred);
status.setBytesTransferred(bytesTransferred);
status.setProcessingNanos(processingNanos);
+ status.setProcessingPerformanceStatus(performanceStatus);
final VersionControlInformation vci =
group.getVersionControlInformation();
if (vci != null) {
@@ -660,6 +684,9 @@ public abstract class AbstractEventAccess implements
EventAccess {
if (isProcessorAuthorized) {
status.setCounters(flowFileEvent.getCounters());
}
+
+ final ProcessingPerformanceStatus performanceStatus =
PerformanceMetricsUtil.getPerformanceMetrics(flowFileEvent, procNode);
+ status.setProcessingPerformanceStatus(performanceStatus);
}
// Determine the run status and get any validation error... only
validating while STOPPED
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/PerformanceMetricsUtil.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/PerformanceMetricsUtil.java
new file mode 100644
index 0000000000..78731d93b0
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/PerformanceMetricsUtil.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting;
+
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.repository.FlowFileEvent;
+import org.apache.nifi.controller.status.ProcessingPerformanceStatus;
+
+public class PerformanceMetricsUtil {
+
+ public static ProcessingPerformanceStatus getPerformanceMetrics(final
FlowFileEvent fileEvent, final ProcessorNode processorNode) {
+ final ProcessingPerformanceStatus newMetrics = new
ProcessingPerformanceStatus();
+
+
newMetrics.setIdentifier(processorNode.getProcessGroup().getIdentifier());
+ newMetrics.setCpuDuration(fileEvent.getCpuNanoseconds());
+
newMetrics.setContentReadDuration(fileEvent.getContentReadNanoseconds());
+
newMetrics.setContentWriteDuration(fileEvent.getContentWriteNanoseconds());
+
newMetrics.setSessionCommitDuration(fileEvent.getSessionCommitNanoseconds());
+
newMetrics.setGarbageCollectionDuration(fileEvent.getGargeCollectionMillis());
+
+ return newMetrics;
+ }
+}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 647c38d449..27daac467f 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -107,6 +107,7 @@ import org.apache.nifi.controller.state.SortedStateUtils;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessingPerformanceStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.controller.status.analytics.ConnectionStatusPredictions;
@@ -222,6 +223,7 @@ import org.apache.nifi.web.api.dto.status.PortStatusDTO;
import org.apache.nifi.web.api.dto.status.PortStatusSnapshotDTO;
import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.ProcessingPerformanceStatusDTO;
import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
import org.apache.nifi.web.api.dto.status.ProcessorStatusSnapshotDTO;
import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
@@ -1051,6 +1053,11 @@ public final class DtoFactory {
snapshot.setActiveThreadCount(processGroupStatus.getActiveThreadCount());
snapshot.setTerminatedThreadCount(processGroupStatus.getTerminatedThreadCount());
+ final ProcessingPerformanceStatus performanceStatus =
processGroupStatus.getProcessingPerformanceStatus();
+ if (performanceStatus != null) {
+
snapshot.setProcessingPerformanceStatus(createProcessingPerformanceStatusDTO(performanceStatus));
+ }
+
StatusMerger.updatePrettyPrintedFields(snapshot);
return processGroupStatusDto;
}
@@ -1268,6 +1275,11 @@ public final class DtoFactory {
snapshot.setTerminatedThreadCount(procStatus.getTerminatedThreadCount());
snapshot.setType(procStatus.getType());
+ final ProcessingPerformanceStatus performanceStatus =
procStatus.getProcessingPerformanceStatus();
+ if (performanceStatus != null) {
+
snapshot.setProcessingPerformanceStatus(createProcessingPerformanceStatusDTO(procStatus.getProcessingPerformanceStatus()));
+ }
+
StatusMerger.updatePrettyPrintedFields(snapshot);
return dto;
}
@@ -5102,4 +5114,18 @@ public final class DtoFactory {
public void setExtensionManager(ExtensionManager extensionManager) {
this.extensionManager = extensionManager;
}
+
+ private ProcessingPerformanceStatusDTO
createProcessingPerformanceStatusDTO(final ProcessingPerformanceStatus
performanceStatus) {
+
+ final ProcessingPerformanceStatusDTO performanceStatusDTO = new
ProcessingPerformanceStatusDTO();
+
+ performanceStatusDTO.setIdentifier(performanceStatus.getIdentifier());
+
performanceStatusDTO.setCpuDuration(performanceStatus.getCpuDuration());
+
performanceStatusDTO.setContentReadDuration(performanceStatusDTO.getContentReadDuration());
+
performanceStatusDTO.setContentWriteDuration(performanceStatus.getContentWriteDuration());
+
performanceStatusDTO.setSessionCommitDuration(performanceStatus.getSessionCommitDuration());
+
performanceStatusDTO.setGarbageCollectionDuration(performanceStatus.getGarbageCollectionDuration());
+
+ return performanceStatusDTO;
+ }
}