This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch NIFI-15258
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/NIFI-15258 by this push:
new 5a31758bd08 NIFI-15615: Ensure that we include necessary values in the
ConnectorStatusDTO (#10913)
5a31758bd08 is described below
commit 5a31758bd088df6a9c59a80dbd1c15dc83af42f2
Author: Mark Payne <[email protected]>
AuthorDate: Wed Feb 18 10:54:43 2026 -0500
NIFI-15615: Ensure that we include necessary values in the
ConnectorStatusDTO (#10913)
---
.../web/api/dto/status/ConnectorStatusDTO.java | 100 +++++--
.../api/dto/status/ConnectorStatusSnapshotDTO.java | 322 +++++++++++++++++++++
.../dto/status/NodeConnectorStatusSnapshotDTO.java | 79 +++++
.../cluster/manager/ConnectorEntityMerger.java | 46 ++-
.../apache/nifi/cluster/manager/StatusMerger.java | 84 +++++-
.../cluster/manager/ConnectorEntityMergerTest.java | 152 +++++++++-
.../apache/nifi/web/StandardNiFiServiceFacade.java | 36 ++-
.../org/apache/nifi/web/api/dto/DtoFactory.java | 58 ++++
.../org/apache/nifi/web/api/dto/EntityFactory.java | 5 +-
9 files changed, 831 insertions(+), 51 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectorStatusDTO.java
b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectorStatusDTO.java
index ad19976d45e..388c22dff54 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectorStatusDTO.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectorStatusDTO.java
@@ -18,12 +18,18 @@ package org.apache.nifi.web.api.dto.status;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.xml.bind.annotation.XmlType;
+import jakarta.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import org.apache.nifi.web.api.dto.util.TimeAdapter;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
/**
* DTO for serializing the status of a connector.
*/
@XmlType(name = "connectorStatus")
-public class ConnectorStatusDTO extends ComponentStatusDTO {
+public class ConnectorStatusDTO implements Cloneable {
private String id;
private String groupId;
@@ -31,15 +37,17 @@ public class ConnectorStatusDTO extends ComponentStatusDTO {
private String type;
private String runStatus;
private String validationStatus;
- private Integer activeThreadCount;
+ private Date statsLastRefreshed;
+
+ private ConnectorStatusSnapshotDTO aggregateSnapshot;
+ private List<NodeConnectorStatusSnapshotDTO> nodeSnapshots;
/**
* The id of the connector.
*
* @return The connector id
*/
- @Schema(description = "The id of the connector."
- )
+ @Schema(description = "The id of the connector.")
public String getId() {
return id;
}
@@ -53,8 +61,7 @@ public class ConnectorStatusDTO extends ComponentStatusDTO {
*
* @return The group id
*/
- @Schema(description = "The id of the group this connector belongs to."
- )
+ @Schema(description = "The id of the group this connector belongs to.")
public String getGroupId() {
return groupId;
}
@@ -68,8 +75,7 @@ public class ConnectorStatusDTO extends ComponentStatusDTO {
*
* @return The connector name
*/
- @Schema(description = "The name of the connector."
- )
+ @Schema(description = "The name of the connector.")
public String getName() {
return name;
}
@@ -83,8 +89,7 @@ public class ConnectorStatusDTO extends ComponentStatusDTO {
*
* @return The connector type
*/
- @Schema(description = "The type of the connector."
- )
+ @Schema(description = "The type of the connector.")
public String getType() {
return type;
}
@@ -98,14 +103,11 @@ public class ConnectorStatusDTO extends ComponentStatusDTO
{
*
* @return The run status
*/
- @Override
- @Schema(description = "The run status of the connector."
- )
+ @Schema(description = "The run status of the connector.")
public String getRunStatus() {
return runStatus;
}
- @Override
public void setRunStatus(final String runStatus) {
this.runStatus = runStatus;
}
@@ -115,32 +117,70 @@ public class ConnectorStatusDTO extends
ComponentStatusDTO {
*
* @return The validation status
*/
- @Override
- @Schema(description = "The validation status of the connector."
- )
+ @Schema(description = "The validation status of the connector.",
+ allowableValues = {"VALID", "INVALID", "VALIDATING"})
public String getValidationStatus() {
return validationStatus;
}
- @Override
public void setValidationStatus(final String validationStatus) {
this.validationStatus = validationStatus;
}
- /**
- * The number of active threads for the connector.
- *
- * @return The active thread count
- */
- @Override
- @Schema(description = "The number of active threads for the connector."
- )
- public Integer getActiveThreadCount() {
- return activeThreadCount;
+ @XmlJavaTypeAdapter(TimeAdapter.class)
+ @Schema(description = "The timestamp of when the stats were last
refreshed.", type = "string")
+ public Date getStatsLastRefreshed() {
+ return statsLastRefreshed;
+ }
+
+ public void setStatsLastRefreshed(final Date statsLastRefreshed) {
+ this.statsLastRefreshed = statsLastRefreshed;
+ }
+
+ @Schema(description = "A status snapshot that represents the aggregate
stats of all nodes in the cluster. If the NiFi instance is "
+ + "a standalone instance, rather than a cluster, this represents the
stats of the single instance.")
+ public ConnectorStatusSnapshotDTO getAggregateSnapshot() {
+ return aggregateSnapshot;
+ }
+
+ public void setAggregateSnapshot(final ConnectorStatusSnapshotDTO
aggregateSnapshot) {
+ this.aggregateSnapshot = aggregateSnapshot;
+ }
+
+ @Schema(description = "A status snapshot for each node in the cluster. If
the NiFi instance is a standalone instance, rather than "
+ + "a cluster, this may be null.")
+ public List<NodeConnectorStatusSnapshotDTO> getNodeSnapshots() {
+ return nodeSnapshots;
+ }
+
+ public void setNodeSnapshots(final List<NodeConnectorStatusSnapshotDTO>
nodeSnapshots) {
+ this.nodeSnapshots = nodeSnapshots;
}
@Override
- public void setActiveThreadCount(final Integer activeThreadCount) {
- this.activeThreadCount = activeThreadCount;
+ public ConnectorStatusDTO clone() {
+ final ConnectorStatusDTO other = new ConnectorStatusDTO();
+ other.setId(getId());
+ other.setGroupId(getGroupId());
+ other.setName(getName());
+ other.setType(getType());
+ other.setRunStatus(getRunStatus());
+ other.setValidationStatus(getValidationStatus());
+ other.setStatsLastRefreshed(getStatsLastRefreshed());
+
+ if (getAggregateSnapshot() != null) {
+ other.setAggregateSnapshot(getAggregateSnapshot().clone());
+ }
+
+ final List<NodeConnectorStatusSnapshotDTO> snapshots =
getNodeSnapshots();
+ if (snapshots != null) {
+ final List<NodeConnectorStatusSnapshotDTO> snapshotClones = new
ArrayList<>(snapshots.size());
+ for (final NodeConnectorStatusSnapshotDTO snapshot : snapshots) {
+ snapshotClones.add(snapshot.clone());
+ }
+ other.setNodeSnapshots(snapshotClones);
+ }
+
+ return other;
}
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectorStatusSnapshotDTO.java
b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectorStatusSnapshotDTO.java
new file mode 100644
index 00000000000..23833dfb973
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectorStatusSnapshotDTO.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.dto.status;
+
+import io.swagger.v3.oas.annotations.media.Schema;
+import jakarta.xml.bind.annotation.XmlType;
+
+/**
+ * DTO for serializing the status snapshot of a connector.
+ */
+@XmlType(name = "connectorStatusSnapshot")
+public class ConnectorStatusSnapshotDTO implements Cloneable {
+
+ private String id;
+ private String groupId;
+ private String name;
+ private String type;
+ private String runStatus;
+
+ private Integer flowFilesSent = 0;
+ private Long bytesSent = 0L;
+ private Integer flowFilesReceived = 0;
+ private Long bytesReceived = 0L;
+ private Long bytesRead = 0L;
+ private Long bytesWritten = 0L;
+ private String sent;
+ private String received;
+ private String read;
+ private String written;
+
+ private Integer flowFilesQueued = 0;
+ private Long bytesQueued = 0L;
+ private String queued;
+ private String queuedCount;
+ private String queuedSize;
+
+ private Integer activeThreadCount = 0;
+
+ private ProcessingPerformanceStatusDTO processingPerformanceStatus;
+
+ private Boolean idle;
+ private Long idleDurationMillis;
+ private String idleDuration;
+
+ @Schema(description = "The id of the connector.")
+ public String getId() {
+ return id;
+ }
+
+ public void setId(final String id) {
+ this.id = id;
+ }
+
+ @Schema(description = "The id of the parent process group to which the
connector belongs.")
+ public String getGroupId() {
+ return groupId;
+ }
+
+ public void setGroupId(final String groupId) {
+ this.groupId = groupId;
+ }
+
+ @Schema(description = "The name of the connector.")
+ public String getName() {
+ return name;
+ }
+
+ public void setName(final String name) {
+ this.name = name;
+ }
+
+ @Schema(description = "The type of the connector.")
+ public String getType() {
+ return type;
+ }
+
+ public void setType(final String type) {
+ this.type = type;
+ }
+
+ @Schema(description = "The run status of the connector.")
+ public String getRunStatus() {
+ return runStatus;
+ }
+
+ public void setRunStatus(final String runStatus) {
+ this.runStatus = runStatus;
+ }
+
+ @Schema(description = "The number of FlowFiles sent by this connector's
managed process group.")
+ public Integer getFlowFilesSent() {
+ return flowFilesSent;
+ }
+
+ public void setFlowFilesSent(final Integer flowFilesSent) {
+ this.flowFilesSent = flowFilesSent;
+ }
+
+ @Schema(description = "The number of bytes sent by this connector's
managed process group.")
+ public Long getBytesSent() {
+ return bytesSent;
+ }
+
+ public void setBytesSent(final Long bytesSent) {
+ this.bytesSent = bytesSent;
+ }
+
+ @Schema(description = "The number of FlowFiles received by this
connector's managed process group.")
+ public Integer getFlowFilesReceived() {
+ return flowFilesReceived;
+ }
+
+ public void setFlowFilesReceived(final Integer flowFilesReceived) {
+ this.flowFilesReceived = flowFilesReceived;
+ }
+
+ @Schema(description = "The number of bytes received by this connector's
managed process group.")
+ public Long getBytesReceived() {
+ return bytesReceived;
+ }
+
+ public void setBytesReceived(final Long bytesReceived) {
+ this.bytesReceived = bytesReceived;
+ }
+
+ @Schema(description = "The number of bytes read by processors in this
connector's managed process group.")
+ public Long getBytesRead() {
+ return bytesRead;
+ }
+
+ public void setBytesRead(final Long bytesRead) {
+ this.bytesRead = bytesRead;
+ }
+
+ @Schema(description = "The number of bytes written by processors in this
connector's managed process group.")
+ public Long getBytesWritten() {
+ return bytesWritten;
+ }
+
+ public void setBytesWritten(final Long bytesWritten) {
+ this.bytesWritten = bytesWritten;
+ }
+
+ @Schema(description = "The count/size of data that has been sent by this
connector, pretty-printed.")
+ public String getSent() {
+ return sent;
+ }
+
+ public void setSent(final String sent) {
+ this.sent = sent;
+ }
+
+ @Schema(description = "The count/size of data that has been received by
this connector, pretty-printed.")
+ public String getReceived() {
+ return received;
+ }
+
+ public void setReceived(final String received) {
+ this.received = received;
+ }
+
+ @Schema(description = "The number of bytes read, pretty-printed.")
+ public String getRead() {
+ return read;
+ }
+
+ public void setRead(final String read) {
+ this.read = read;
+ }
+
+ @Schema(description = "The number of bytes written, pretty-printed.")
+ public String getWritten() {
+ return written;
+ }
+
+ public void setWritten(final String written) {
+ this.written = written;
+ }
+
+ @Schema(description = "The number of FlowFiles queued in this connector's
managed process group.")
+ public Integer getFlowFilesQueued() {
+ return flowFilesQueued;
+ }
+
+ public void setFlowFilesQueued(final Integer flowFilesQueued) {
+ this.flowFilesQueued = flowFilesQueued;
+ }
+
+ @Schema(description = "The number of bytes queued in this connector's
managed process group.")
+ public Long getBytesQueued() {
+ return bytesQueued;
+ }
+
+ public void setBytesQueued(final Long bytesQueued) {
+ this.bytesQueued = bytesQueued;
+ }
+
+ @Schema(description = "The count/size of queued data, pretty-printed.")
+ public String getQueued() {
+ return queued;
+ }
+
+ public void setQueued(final String queued) {
+ this.queued = queued;
+ }
+
+ @Schema(description = "The count of queued FlowFiles, pretty-printed.")
+ public String getQueuedCount() {
+ return queuedCount;
+ }
+
+ public void setQueuedCount(final String queuedCount) {
+ this.queuedCount = queuedCount;
+ }
+
+ @Schema(description = "The size of queued data, pretty-printed.")
+ public String getQueuedSize() {
+ return queuedSize;
+ }
+
+ public void setQueuedSize(final String queuedSize) {
+ this.queuedSize = queuedSize;
+ }
+
+ @Schema(description = "The number of active threads for the connector.")
+ public Integer getActiveThreadCount() {
+ return activeThreadCount;
+ }
+
+ public void setActiveThreadCount(final Integer activeThreadCount) {
+ this.activeThreadCount = activeThreadCount;
+ }
+
+ @Schema(description = "The processing performance status of the processors
in this connector's managed process group.")
+ public ProcessingPerformanceStatusDTO getProcessingPerformanceStatus() {
+ return processingPerformanceStatus;
+ }
+
+ public void setProcessingPerformanceStatus(final
ProcessingPerformanceStatusDTO processingPerformanceStatus) {
+ this.processingPerformanceStatus = processingPerformanceStatus;
+ }
+
+ @Schema(description = "Whether or not the connector is currently idle (no
FlowFiles queued and no FlowFiles processed recently).")
+ public Boolean getIdle() {
+ return idle;
+ }
+
+ public void setIdle(final Boolean idle) {
+ this.idle = idle;
+ }
+
+ @Schema(description = "The number of milliseconds the connector has been
idle, or null if the connector is not idle.")
+ public Long getIdleDurationMillis() {
+ return idleDurationMillis;
+ }
+
+ public void setIdleDurationMillis(final Long idleDurationMillis) {
+ this.idleDurationMillis = idleDurationMillis;
+ }
+
+ @Schema(description = "A human-readable representation of how long the
connector has been idle, or null if the connector is not idle.")
+ public String getIdleDuration() {
+ return idleDuration;
+ }
+
+ public void setIdleDuration(final String idleDuration) {
+ this.idleDuration = idleDuration;
+ }
+
+ @Override
+ public ConnectorStatusSnapshotDTO clone() {
+ final ConnectorStatusSnapshotDTO other = new
ConnectorStatusSnapshotDTO();
+ other.setId(getId());
+ other.setGroupId(getGroupId());
+ other.setName(getName());
+ other.setType(getType());
+ other.setRunStatus(getRunStatus());
+
+ other.setFlowFilesSent(getFlowFilesSent());
+ other.setBytesSent(getBytesSent());
+ other.setFlowFilesReceived(getFlowFilesReceived());
+ other.setBytesReceived(getBytesReceived());
+ other.setBytesRead(getBytesRead());
+ other.setBytesWritten(getBytesWritten());
+ other.setSent(getSent());
+ other.setReceived(getReceived());
+ other.setRead(getRead());
+ other.setWritten(getWritten());
+
+ other.setFlowFilesQueued(getFlowFilesQueued());
+ other.setBytesQueued(getBytesQueued());
+ other.setQueued(getQueued());
+ other.setQueuedCount(getQueuedCount());
+ other.setQueuedSize(getQueuedSize());
+
+ other.setActiveThreadCount(getActiveThreadCount());
+
+ if (getProcessingPerformanceStatus() != null) {
+
other.setProcessingPerformanceStatus(getProcessingPerformanceStatus().clone());
+ }
+
+ other.setIdle(getIdle());
+ other.setIdleDurationMillis(getIdleDurationMillis());
+ other.setIdleDuration(getIdleDuration());
+
+ return other;
+ }
+}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/NodeConnectorStatusSnapshotDTO.java
b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/NodeConnectorStatusSnapshotDTO.java
new file mode 100644
index 00000000000..de9cc43a6c8
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/NodeConnectorStatusSnapshotDTO.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.dto.status;
+
+import io.swagger.v3.oas.annotations.media.Schema;
+import jakarta.xml.bind.annotation.XmlType;
+
+/**
+ * DTO for serializing the connector status snapshot for a particular node.
+ */
+@XmlType(name = "nodeConnectorStatusSnapshot")
+public class NodeConnectorStatusSnapshotDTO implements Cloneable {
+
+ private String nodeId;
+ private String address;
+ private Integer apiPort;
+
+ private ConnectorStatusSnapshotDTO statusSnapshot;
+
+ @Schema(description = "The unique ID that identifies the node")
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public void setNodeId(final String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ @Schema(description = "The API address of the node")
+ public String getAddress() {
+ return address;
+ }
+
+ public void setAddress(final String address) {
+ this.address = address;
+ }
+
+ @Schema(description = "The API port used to communicate with the node")
+ public Integer getApiPort() {
+ return apiPort;
+ }
+
+ public void setApiPort(final Integer apiPort) {
+ this.apiPort = apiPort;
+ }
+
+ @Schema(description = "The connector status snapshot from the node.")
+ public ConnectorStatusSnapshotDTO getStatusSnapshot() {
+ return statusSnapshot;
+ }
+
+ public void setStatusSnapshot(final ConnectorStatusSnapshotDTO
statusSnapshot) {
+ this.statusSnapshot = statusSnapshot;
+ }
+
+ @Override
+ public NodeConnectorStatusSnapshotDTO clone() {
+ final NodeConnectorStatusSnapshotDTO other = new
NodeConnectorStatusSnapshotDTO();
+ other.setNodeId(getNodeId());
+ other.setAddress(getAddress());
+ other.setApiPort(getApiPort());
+ other.setStatusSnapshot(getStatusSnapshot().clone());
+ return other;
+ }
+}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectorEntityMerger.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectorEntityMerger.java
index 7ef7fdf76bb..baa0f632367 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectorEntityMerger.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectorEntityMerger.java
@@ -21,8 +21,11 @@ import org.apache.nifi.components.connector.ConnectorState;
import org.apache.nifi.web.api.dto.ConfigurationStepConfigurationDTO;
import org.apache.nifi.web.api.dto.ConnectorConfigurationDTO;
import org.apache.nifi.web.api.dto.ConnectorDTO;
+import org.apache.nifi.web.api.dto.status.ConnectorStatusDTO;
+import org.apache.nifi.web.api.dto.status.NodeConnectorStatusSnapshotDTO;
import org.apache.nifi.web.api.entity.ConnectorEntity;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
@@ -48,10 +51,47 @@ public class ConnectorEntityMerger {
}
private static void mergeStatus(final ConnectorEntity clientEntity, final
Map<NodeIdentifier, ConnectorEntity> entityMap) {
- for (final ConnectorEntity nodeEntity : entityMap.values()) {
- if (nodeEntity != clientEntity && nodeEntity != null) {
- StatusMerger.merge(clientEntity.getStatus(),
nodeEntity.getStatus());
+ final ConnectorStatusDTO mergedStatus = clientEntity.getStatus();
+ if (mergedStatus == null) {
+ return;
+ }
+
+ mergedStatus.setNodeSnapshots(new ArrayList<>());
+
+ // Identify the selected node (the one whose response was chosen as
the client entity)
+ final NodeIdentifier selectedNodeId = entityMap.entrySet().stream()
+ .filter(e -> e.getValue() == clientEntity)
+ .map(Map.Entry::getKey)
+ .findFirst()
+ .orElse(null);
+
+ // Add the selected node's snapshot to the node snapshots list
+ if (selectedNodeId != null && mergedStatus.getAggregateSnapshot() !=
null) {
+ final NodeConnectorStatusSnapshotDTO selectedNodeSnapshot = new
NodeConnectorStatusSnapshotDTO();
+
selectedNodeSnapshot.setStatusSnapshot(mergedStatus.getAggregateSnapshot().clone());
+ selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress());
+ selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort());
+ selectedNodeSnapshot.setNodeId(selectedNodeId.getId());
+ mergedStatus.getNodeSnapshots().add(selectedNodeSnapshot);
+ }
+
+ // Merge snapshots from other nodes
+ for (final Map.Entry<NodeIdentifier, ConnectorEntity> entry :
entityMap.entrySet()) {
+ final NodeIdentifier nodeId = entry.getKey();
+ final ConnectorEntity nodeEntity = entry.getValue();
+ if (nodeEntity == clientEntity || nodeEntity == null) {
+ continue;
+ }
+
+ final ConnectorStatusDTO nodeStatus = nodeEntity.getStatus();
+ if (nodeStatus == null) {
+ continue;
}
+
+ final boolean clientReadable = clientEntity.getPermissions() !=
null && Boolean.TRUE.equals(clientEntity.getPermissions().getCanRead());
+ final boolean nodeReadable = nodeEntity.getPermissions() != null
&& Boolean.TRUE.equals(nodeEntity.getPermissions().getCanRead());
+ StatusMerger.merge(mergedStatus, clientReadable, nodeStatus,
nodeReadable,
+ nodeId.getId(), nodeId.getApiAddress(),
nodeId.getApiPort());
}
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
index 5b403e95f8b..59bcce57986 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
@@ -44,10 +44,12 @@ import
org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
import
org.apache.nifi.web.api.dto.status.ConnectionStatusPredictionsSnapshotDTO;
import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
import org.apache.nifi.web.api.dto.status.ConnectorStatusDTO;
+import org.apache.nifi.web.api.dto.status.ConnectorStatusSnapshotDTO;
import org.apache.nifi.web.api.dto.status.ControllerServiceStatusDTO;
import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
import org.apache.nifi.web.api.dto.status.FlowAnalysisRuleStatusDTO;
import org.apache.nifi.web.api.dto.status.NodeConnectionStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.NodeConnectorStatusSnapshotDTO;
import org.apache.nifi.web.api.dto.status.NodePortStatusSnapshotDTO;
import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO;
import org.apache.nifi.web.api.dto.status.NodeProcessorStatusSnapshotDTO;
@@ -1086,17 +1088,95 @@ public class StatusMerger {
}
}
- public static void merge(final ConnectorStatusDTO target, final
ConnectorStatusDTO toMerge) {
+ public static void merge(final ConnectorStatusDTO target, final boolean
targetReadablePermission, final ConnectorStatusDTO toMerge, final boolean
toMergeReadablePermission,
+ final String nodeId, final String nodeAddress,
final Integer nodeApiPort) {
if (target == null || toMerge == null) {
return;
}
- target.setActiveThreadCount(target.getActiveThreadCount() +
toMerge.getActiveThreadCount());
+ if (targetReadablePermission && !toMergeReadablePermission) {
+ target.setGroupId(toMerge.getGroupId());
+ target.setId(toMerge.getId());
+ target.setName(toMerge.getName());
+ target.setType(toMerge.getType());
+ }
if
(ValidationStatus.VALIDATING.name().equalsIgnoreCase(toMerge.getValidationStatus()))
{
target.setValidationStatus(ValidationStatus.VALIDATING.name());
} else if
(ValidationStatus.INVALID.name().equalsIgnoreCase(toMerge.getValidationStatus()))
{
target.setValidationStatus(ValidationStatus.INVALID.name());
}
+
+ merge(target.getAggregateSnapshot(), targetReadablePermission,
toMerge.getAggregateSnapshot(), toMergeReadablePermission);
+
+ if (target.getNodeSnapshots() != null) {
+ final NodeConnectorStatusSnapshotDTO nodeSnapshot = new
NodeConnectorStatusSnapshotDTO();
+ nodeSnapshot.setStatusSnapshot(toMerge.getAggregateSnapshot());
+ nodeSnapshot.setAddress(nodeAddress);
+ nodeSnapshot.setApiPort(nodeApiPort);
+ nodeSnapshot.setNodeId(nodeId);
+
+ target.getNodeSnapshots().add(nodeSnapshot);
+ }
+ }
+
+ public static void merge(final ConnectorStatusSnapshotDTO target, final
boolean targetReadablePermission,
+ final ConnectorStatusSnapshotDTO toMerge, final
boolean toMergeReadablePermission) {
+ if (target == null || toMerge == null) {
+ return;
+ }
+
+ if (targetReadablePermission && !toMergeReadablePermission) {
+ target.setId(toMerge.getId());
+ target.setGroupId(toMerge.getGroupId());
+ target.setName(toMerge.getName());
+ target.setType(toMerge.getType());
+ }
+
+ target.setFlowFilesSent(target.getFlowFilesSent() +
toMerge.getFlowFilesSent());
+ target.setBytesSent(target.getBytesSent() + toMerge.getBytesSent());
+ target.setFlowFilesReceived(target.getFlowFilesReceived() +
toMerge.getFlowFilesReceived());
+ target.setBytesReceived(target.getBytesReceived() +
toMerge.getBytesReceived());
+ target.setBytesRead(target.getBytesRead() + toMerge.getBytesRead());
+ target.setBytesWritten(target.getBytesWritten() +
toMerge.getBytesWritten());
+
+ target.setFlowFilesQueued(target.getFlowFilesQueued() +
toMerge.getFlowFilesQueued());
+ target.setBytesQueued(target.getBytesQueued() +
toMerge.getBytesQueued());
+
+ target.setActiveThreadCount(target.getActiveThreadCount() +
toMerge.getActiveThreadCount());
+
+ // For idle status, the connector is considered idle only if ALL nodes
report it as idle.
+ // The idle duration is the minimum across nodes (the most recently
active node determines the duration).
+ if (Boolean.TRUE.equals(target.getIdle()) &&
Boolean.TRUE.equals(toMerge.getIdle())) {
+ if (target.getIdleDurationMillis() != null &&
toMerge.getIdleDurationMillis() != null) {
+
target.setIdleDurationMillis(Math.min(target.getIdleDurationMillis(),
toMerge.getIdleDurationMillis()));
+ } else if (toMerge.getIdleDurationMillis() != null) {
+ target.setIdleDurationMillis(toMerge.getIdleDurationMillis());
+ }
+ } else {
+ target.setIdle(false);
+ target.setIdleDurationMillis(null);
+ target.setIdleDuration(null);
+ }
+
+
ProcessingPerformanceStatusMerger.mergeStatus(target.getProcessingPerformanceStatus(),
toMerge.getProcessingPerformanceStatus());
+
+ updatePrettyPrintedFields(target);
+ }
+
+ public static void updatePrettyPrintedFields(final
ConnectorStatusSnapshotDTO target) {
+ target.setSent(prettyPrint(target.getFlowFilesSent(),
target.getBytesSent()));
+ target.setReceived(prettyPrint(target.getFlowFilesReceived(),
target.getBytesReceived()));
+ target.setRead(formatDataSize(target.getBytesRead()));
+ target.setWritten(formatDataSize(target.getBytesWritten()));
+ target.setQueued(prettyPrint(target.getFlowFilesQueued(),
target.getBytesQueued()));
+ target.setQueuedCount(formatCount(target.getFlowFilesQueued()));
+ target.setQueuedSize(formatDataSize(target.getBytesQueued()));
+
+ if (Boolean.TRUE.equals(target.getIdle()) &&
target.getIdleDurationMillis() != null) {
+
target.setIdleDuration(FormatUtils.formatHoursMinutesSeconds(target.getIdleDurationMillis(),
TimeUnit.MILLISECONDS));
+ } else {
+ target.setIdleDuration(null);
+ }
}
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectorEntityMergerTest.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectorEntityMergerTest.java
index 21f351b7a9f..1560afa564b 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectorEntityMergerTest.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectorEntityMergerTest.java
@@ -26,6 +26,8 @@ import org.apache.nifi.web.api.dto.PermissionsDTO;
import org.apache.nifi.web.api.dto.PropertyGroupConfigurationDTO;
import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.dto.status.ConnectorStatusDTO;
+import org.apache.nifi.web.api.dto.status.ConnectorStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.NodeConnectorStatusSnapshotDTO;
import org.apache.nifi.web.api.entity.AllowableValueEntity;
import org.apache.nifi.web.api.entity.ConnectorEntity;
import org.junit.jupiter.api.Test;
@@ -35,7 +37,10 @@ import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
class ConnectorEntityMergerTest {
@@ -133,9 +138,12 @@ class ConnectorEntityMergerTest {
@Test
void testMergeConnectorStatusValidationStatusPriority() {
- final ConnectorEntity clientEntity =
createConnectorEntityWithStatus("connector1", "RUNNING", 1, "VALID");
- final ConnectorEntity node1Entity =
createConnectorEntityWithStatus("connector1", "RUNNING", 1, "VALIDATING");
- final ConnectorEntity node2Entity =
createConnectorEntityWithStatus("connector1", "RUNNING", 1, "VALID");
+ final ConnectorEntity clientEntity =
createConnectorEntityWithStatus("connector1", "RUNNING", 1, "VALID",
+ 5, 100L, 10, 200L, 50L, 75L, 10, 1000L, true, 5000L);
+ final ConnectorEntity node1Entity =
createConnectorEntityWithStatus("connector1", "RUNNING", 1, "VALIDATING",
+ 7, 150L, 12, 250L, 60L, 80L, 5, 500L, true, 3000L);
+ final ConnectorEntity node2Entity =
createConnectorEntityWithStatus("connector1", "RUNNING", 1, "VALID",
+ 3, 200L, 8, 300L, 70L, 85L, 8, 800L, false, null);
final Map<NodeIdentifier, ConnectorEntity> entityMap = new HashMap<>();
entityMap.put(getNodeIdentifier("client", 8000), clientEntity);
@@ -147,6 +155,118 @@ class ConnectorEntityMergerTest {
assertEquals("VALIDATING",
clientEntity.getStatus().getValidationStatus());
}
+ @Test
+ void testMergeStatusSnapshotsAggregated() {
+ final ConnectorEntity clientEntity =
createConnectorEntityWithStatus("connector1", "RUNNING", 2, "VALID",
+ 5, 100L, 10, 200L, 50L, 75L, 10, 1000L, true, 5000L);
+ final ConnectorEntity node1Entity =
createConnectorEntityWithStatus("connector1", "RUNNING", 3, "VALID",
+ 7, 150L, 12, 250L, 60L, 80L, 5, 500L, true, 3000L);
+
+ final Map<NodeIdentifier, ConnectorEntity> entityMap = new HashMap<>();
+ entityMap.put(getNodeIdentifier("client", 8000), clientEntity);
+ entityMap.put(getNodeIdentifier("node1", 8001), node1Entity);
+
+ ConnectorEntityMerger.merge(clientEntity, entityMap);
+
+ final ConnectorStatusDTO mergedStatus = clientEntity.getStatus();
+ assertNotNull(mergedStatus);
+ assertNotNull(mergedStatus.getAggregateSnapshot());
+ assertNotNull(mergedStatus.getNodeSnapshots());
+
+ // Verify node snapshots contain both nodes
+ assertEquals(2, mergedStatus.getNodeSnapshots().size());
+
+ // Verify aggregate snapshot has summed values
+ final ConnectorStatusSnapshotDTO aggregate =
mergedStatus.getAggregateSnapshot();
+ assertEquals(Integer.valueOf(12), aggregate.getFlowFilesSent()); //
5 + 7
+ assertEquals(Long.valueOf(250L), aggregate.getBytesSent()); //
100 + 150
+ assertEquals(Integer.valueOf(22), aggregate.getFlowFilesReceived());
// 10 + 12
+ assertEquals(Long.valueOf(450L), aggregate.getBytesReceived()); //
200 + 250
+ assertEquals(Long.valueOf(110L), aggregate.getBytesRead()); //
50 + 60
+ assertEquals(Long.valueOf(155L), aggregate.getBytesWritten()); //
75 + 80
+ assertEquals(Integer.valueOf(15), aggregate.getFlowFilesQueued()); //
10 + 5
+ assertEquals(Long.valueOf(1500L), aggregate.getBytesQueued()); //
1000 + 500
+ assertEquals(Integer.valueOf(5), aggregate.getActiveThreadCount()); //
2 + 3
+
+ // Both nodes are idle, so aggregate should be idle with min duration
+ assertTrue(aggregate.getIdle());
+ assertEquals(Long.valueOf(3000L), aggregate.getIdleDurationMillis());
// min(5000, 3000)
+ }
+
+ @Test
+ void testMergeStatusSnapshotsIdleWhenOneNodeNotIdle() {
+ final ConnectorEntity clientEntity =
createConnectorEntityWithStatus("connector1", "RUNNING", 2, "VALID",
+ 5, 100L, 10, 200L, 50L, 75L, 10, 1000L, true, 5000L);
+ final ConnectorEntity node1Entity =
createConnectorEntityWithStatus("connector1", "RUNNING", 3, "VALID",
+ 7, 150L, 12, 250L, 60L, 80L, 5, 500L, false, null);
+
+ final Map<NodeIdentifier, ConnectorEntity> entityMap = new HashMap<>();
+ entityMap.put(getNodeIdentifier("client", 8000), clientEntity);
+ entityMap.put(getNodeIdentifier("node1", 8001), node1Entity);
+
+ ConnectorEntityMerger.merge(clientEntity, entityMap);
+
+ final ConnectorStatusSnapshotDTO aggregate =
clientEntity.getStatus().getAggregateSnapshot();
+
+ // One node is not idle, so aggregate should not be idle
+ assertFalse(aggregate.getIdle());
+ assertNull(aggregate.getIdleDurationMillis());
+ }
+
+ @Test
+ void testMergeStatusNodeSnapshotsContainCorrectNodeInfo() {
+ final ConnectorEntity clientEntity =
createConnectorEntityWithStatus("connector1", "RUNNING", 2, "VALID",
+ 5, 100L, 10, 200L, 50L, 75L, 10, 1000L, false, null);
+ final ConnectorEntity node1Entity =
createConnectorEntityWithStatus("connector1", "RUNNING", 3, "VALID",
+ 7, 150L, 12, 250L, 60L, 80L, 5, 500L, false, null);
+
+ final NodeIdentifier clientNodeId = getNodeIdentifier("client", 8000);
+ final NodeIdentifier node1NodeId = getNodeIdentifier("node1", 8001);
+
+ final Map<NodeIdentifier, ConnectorEntity> entityMap = new HashMap<>();
+ entityMap.put(clientNodeId, clientEntity);
+ entityMap.put(node1NodeId, node1Entity);
+
+ ConnectorEntityMerger.merge(clientEntity, entityMap);
+
+ final List<NodeConnectorStatusSnapshotDTO> nodeSnapshots =
clientEntity.getStatus().getNodeSnapshots();
+ assertEquals(2, nodeSnapshots.size());
+
+ // Find the client node snapshot
+ final NodeConnectorStatusSnapshotDTO clientSnapshot =
nodeSnapshots.stream()
+ .filter(s -> s.getNodeId().equals(clientNodeId.getId()))
+ .findFirst()
+ .orElse(null);
+ assertNotNull(clientSnapshot);
+ assertEquals(clientNodeId.getApiAddress(),
clientSnapshot.getAddress());
+ assertEquals(clientNodeId.getApiPort(), clientSnapshot.getApiPort());
+ assertEquals(Long.valueOf(100L),
clientSnapshot.getStatusSnapshot().getBytesSent());
+
+ // Find the other node snapshot
+ final NodeConnectorStatusSnapshotDTO node1Snapshot =
nodeSnapshots.stream()
+ .filter(s -> s.getNodeId().equals(node1NodeId.getId()))
+ .findFirst()
+ .orElse(null);
+ assertNotNull(node1Snapshot);
+ assertEquals(node1NodeId.getApiAddress(), node1Snapshot.getAddress());
+ assertEquals(node1NodeId.getApiPort(), node1Snapshot.getApiPort());
+ assertEquals(Long.valueOf(150L),
node1Snapshot.getStatusSnapshot().getBytesSent());
+ }
+
+ @Test
+ void testMergeStatusWithNullStatus() {
+ final ConnectorEntity clientEntity =
createConnectorEntity("connector1", "STOPPED");
+ final ConnectorEntity node1Entity =
createConnectorEntity("connector1", "STOPPED");
+
+ final Map<NodeIdentifier, ConnectorEntity> entityMap = new HashMap<>();
+ entityMap.put(getNodeIdentifier("client", 8000), clientEntity);
+ entityMap.put(getNodeIdentifier("node1", 8001), node1Entity);
+
+ // Both have null status - should not throw
+ ConnectorEntityMerger.merge(clientEntity, entityMap);
+ assertNull(clientEntity.getStatus());
+ }
+
private NodeIdentifier getNodeIdentifier(final String id, final int port) {
return new NodeIdentifier(id, "localhost", port, "localhost", port +
1, "localhost", port + 2, port + 3, true);
}
@@ -170,14 +290,36 @@ class ConnectorEntityMergerTest {
return entity;
}
- private ConnectorEntity createConnectorEntityWithStatus(final String id,
final String state, final int activeThreadCount, final String validationStatus)
{
+ private ConnectorEntity createConnectorEntityWithStatus(final String id,
final String state, final int activeThreadCount, final String validationStatus,
+ final Integer
flowFilesSent, final Long bytesSent,
+ final Integer
flowFilesReceived, final Long bytesReceived,
+ final Long
bytesRead, final Long bytesWritten,
+ final Integer
flowFilesQueued, final Long bytesQueued,
+ final Boolean
idle, final Long idleDurationMillis) {
final ConnectorEntity entity = createConnectorEntity(id, state);
+ final ConnectorStatusSnapshotDTO snapshot = new
ConnectorStatusSnapshotDTO();
+ snapshot.setId(id);
+ snapshot.setName("Test Connector");
+ snapshot.setType("TestConnector");
+ snapshot.setRunStatus(state);
+ snapshot.setActiveThreadCount(activeThreadCount);
+ snapshot.setFlowFilesSent(flowFilesSent);
+ snapshot.setBytesSent(bytesSent);
+ snapshot.setFlowFilesReceived(flowFilesReceived);
+ snapshot.setBytesReceived(bytesReceived);
+ snapshot.setBytesRead(bytesRead);
+ snapshot.setBytesWritten(bytesWritten);
+ snapshot.setFlowFilesQueued(flowFilesQueued);
+ snapshot.setBytesQueued(bytesQueued);
+ snapshot.setIdle(idle);
+ snapshot.setIdleDurationMillis(idleDurationMillis);
+
final ConnectorStatusDTO status = new ConnectorStatusDTO();
status.setId(id);
status.setRunStatus(state);
- status.setActiveThreadCount(activeThreadCount);
status.setValidationStatus(validationStatus);
+ status.setAggregateSnapshot(snapshot);
entity.setStatus(status);
return entity;
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 97ea052f0e0..df2caf66076 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -324,6 +324,7 @@ import
org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO;
import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
import org.apache.nifi.web.api.dto.status.ConnectionStatisticsDTO;
import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
+import org.apache.nifi.web.api.dto.status.ConnectorStatusDTO;
import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO;
import org.apache.nifi.web.api.dto.status.PortStatusDTO;
@@ -3547,6 +3548,12 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
return
entityFactory.createReportingTaskEntity(snapshot.getComponent(),
dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions,
operatePermissions, bulletinEntities);
}
+ private ConnectorStatusDTO createConnectorStatusDto(final ConnectorNode
connectorNode) {
+ final ProcessGroup managedProcessGroup =
connectorNode.getActiveFlowContext().getManagedProcessGroup();
+ final ProcessGroupStatus managedGroupStatus =
controllerFacade.getProcessGroupStatus(managedProcessGroup.getIdentifier());
+ return dtoFactory.createConnectorStatusDto(connectorNode,
managedGroupStatus);
+ }
+
@Override
public ConnectorEntity createConnector(final Revision revision, final
ConnectorDTO connectorDTO) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
@@ -3566,7 +3573,8 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
final ConnectorNode connector =
connectorDAO.getConnector(snapshot.getComponent().getId());
final PermissionsDTO permissions =
dtoFactory.createPermissionsDto(connector);
final PermissionsDTO operatePermissions =
dtoFactory.createPermissionsDto(new OperationAuthorizable(connector));
- return entityFactory.createConnectorEntity(snapshot.getComponent(),
dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions,
operatePermissions);
+ final ConnectorStatusDTO status = createConnectorStatusDto(connector);
+ return entityFactory.createConnectorEntity(snapshot.getComponent(),
dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions,
operatePermissions, status);
}
@Override
@@ -3576,7 +3584,8 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
final RevisionDTO revision =
dtoFactory.createRevisionDTO(revisionManager.getRevision(node.getIdentifier()));
final PermissionsDTO permissions =
dtoFactory.createPermissionsDto(node);
final PermissionsDTO operatePermissions =
dtoFactory.createPermissionsDto(new OperationAuthorizable(node));
- return entityFactory.createConnectorEntity(dto, revision,
permissions, operatePermissions);
+ final ConnectorStatusDTO status = createConnectorStatusDto(node);
+ return entityFactory.createConnectorEntity(dto, revision,
permissions, operatePermissions, status);
}).collect(Collectors.toSet());
}
@@ -3587,7 +3596,8 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
final RevisionDTO revision =
dtoFactory.createRevisionDTO(revisionManager.getRevision(node.getIdentifier()));
final PermissionsDTO permissions =
dtoFactory.createPermissionsDto(node);
final PermissionsDTO operatePermissions =
dtoFactory.createPermissionsDto(new OperationAuthorizable(node));
- return entityFactory.createConnectorEntity(dto, revision, permissions,
operatePermissions);
+ final ConnectorStatusDTO status = createConnectorStatusDto(node);
+ return entityFactory.createConnectorEntity(dto, revision, permissions,
operatePermissions, status);
}
@Override
@@ -3614,7 +3624,8 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
final ConnectorNode node =
connectorDAO.getConnector(snapshot.getComponent().getId());
final PermissionsDTO permissions =
dtoFactory.createPermissionsDto(node);
final PermissionsDTO operatePermissions =
dtoFactory.createPermissionsDto(new OperationAuthorizable(node));
- return entityFactory.createConnectorEntity(snapshot.getComponent(),
dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions,
operatePermissions);
+ final ConnectorStatusDTO statusDto = createConnectorStatusDto(node);
+ return entityFactory.createConnectorEntity(snapshot.getComponent(),
dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions,
operatePermissions, statusDto);
}
@Override
@@ -3644,7 +3655,7 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
final PermissionsDTO operatePermissions = new PermissionsDTO();
operatePermissions.setCanRead(Boolean.FALSE);
operatePermissions.setCanWrite(Boolean.FALSE);
- return entityFactory.createConnectorEntity(dto,
dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions,
operatePermissions);
+ return entityFactory.createConnectorEntity(dto,
dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions,
operatePermissions, null);
}
@Override
@@ -3669,7 +3680,8 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
final ConnectorNode node =
connectorDAO.getConnector(snapshot.getComponent().getId());
final PermissionsDTO permissions =
dtoFactory.createPermissionsDto(node);
final PermissionsDTO operatePermissions =
dtoFactory.createPermissionsDto(new OperationAuthorizable(node));
- return entityFactory.createConnectorEntity(snapshot.getComponent(),
dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions,
operatePermissions);
+ final ConnectorStatusDTO statusDto = createConnectorStatusDto(node);
+ return entityFactory.createConnectorEntity(snapshot.getComponent(),
dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions,
operatePermissions, statusDto);
}
@Override
@@ -3699,7 +3711,8 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
final ConnectorNode node =
connectorDAO.getConnector(snapshot.getComponent().getId());
final PermissionsDTO permissions =
dtoFactory.createPermissionsDto(node);
final PermissionsDTO operatePermissions =
dtoFactory.createPermissionsDto(new OperationAuthorizable(node));
- return entityFactory.createConnectorEntity(snapshot.getComponent(),
dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions,
operatePermissions);
+ final ConnectorStatusDTO statusDto = createConnectorStatusDto(node);
+ return entityFactory.createConnectorEntity(snapshot.getComponent(),
dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions,
operatePermissions, statusDto);
}
@Override
@@ -3725,7 +3738,8 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
final ConnectorNode node =
connectorDAO.getConnector(snapshot.getComponent().getId());
final PermissionsDTO permissions =
dtoFactory.createPermissionsDto(node);
final PermissionsDTO operatePermissions =
dtoFactory.createPermissionsDto(new OperationAuthorizable(node));
- return entityFactory.createConnectorEntity(snapshot.getComponent(),
dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions,
operatePermissions);
+ final ConnectorStatusDTO statusDto = createConnectorStatusDto(node);
+ return entityFactory.createConnectorEntity(snapshot.getComponent(),
dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions,
operatePermissions, statusDto);
}
@Override
@@ -3786,7 +3800,8 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
final ConnectorNode node =
connectorDAO.getConnector(snapshot.getComponent().getId());
final PermissionsDTO permissions =
dtoFactory.createPermissionsDto(node);
final PermissionsDTO operatePermissions =
dtoFactory.createPermissionsDto(new OperationAuthorizable(node));
- return entityFactory.createConnectorEntity(snapshot.getComponent(),
dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions,
operatePermissions);
+ final ConnectorStatusDTO statusDto = createConnectorStatusDto(node);
+ return entityFactory.createConnectorEntity(snapshot.getComponent(),
dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions,
operatePermissions, statusDto);
}
@Override
@@ -3806,7 +3821,8 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
final ConnectorNode node =
connectorDAO.getConnector(snapshot.getComponent().getId());
final PermissionsDTO permissions =
dtoFactory.createPermissionsDto(node);
final PermissionsDTO operatePermissions =
dtoFactory.createPermissionsDto(new OperationAuthorizable(node));
- return entityFactory.createConnectorEntity(snapshot.getComponent(),
dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions,
operatePermissions);
+ final ConnectorStatusDTO statusDto = createConnectorStatusDto(node);
+ return entityFactory.createConnectorEntity(snapshot.getComponent(),
dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions,
operatePermissions, statusDto);
}
@Override
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 6d4ef631a3d..a1a7b9e3e26 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -240,6 +240,8 @@ import
org.apache.nifi.web.api.dto.status.ConnectionStatisticsSnapshotDTO;
import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
import
org.apache.nifi.web.api.dto.status.ConnectionStatusPredictionsSnapshotDTO;
import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.ConnectorStatusDTO;
+import org.apache.nifi.web.api.dto.status.ConnectorStatusSnapshotDTO;
import org.apache.nifi.web.api.dto.status.PortStatusDTO;
import org.apache.nifi.web.api.dto.status.PortStatusSnapshotDTO;
import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
@@ -282,6 +284,7 @@ import java.io.File;
import java.nio.charset.StandardCharsets;
import java.text.Collator;
import java.text.NumberFormat;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -5275,6 +5278,61 @@ public final class DtoFactory {
return dto;
}
+ public ConnectorStatusDTO createConnectorStatusDto(final ConnectorNode
connectorNode, final ProcessGroupStatus managedGroupStatus) {
+ if (connectorNode == null) {
+ return null;
+ }
+
+ final ConnectorStatusDTO statusDto = new ConnectorStatusDTO();
+ statusDto.setId(connectorNode.getIdentifier());
+ statusDto.setName(connectorNode.getName());
+ statusDto.setType(connectorNode.getCanonicalClassName());
+ statusDto.setRunStatus(connectorNode.getCurrentState().name());
+
statusDto.setValidationStatus(connectorNode.getValidationStatus().name());
+ statusDto.setStatsLastRefreshed(new Date());
+
+ final ConnectorStatusSnapshotDTO snapshot = new
ConnectorStatusSnapshotDTO();
+ statusDto.setAggregateSnapshot(snapshot);
+
+ snapshot.setId(connectorNode.getIdentifier());
+ snapshot.setName(connectorNode.getName());
+ snapshot.setType(connectorNode.getCanonicalClassName());
+ snapshot.setRunStatus(connectorNode.getCurrentState().name());
+
+ // Populate all status metrics from the managed process group
+ if (managedGroupStatus != null) {
+ snapshot.setFlowFilesSent(managedGroupStatus.getFlowFilesSent());
+ snapshot.setBytesSent(managedGroupStatus.getBytesSent());
+
snapshot.setFlowFilesReceived(managedGroupStatus.getFlowFilesReceived());
+ snapshot.setBytesReceived(managedGroupStatus.getBytesReceived());
+ snapshot.setBytesRead(managedGroupStatus.getBytesRead());
+ snapshot.setBytesWritten(managedGroupStatus.getBytesWritten());
+ snapshot.setFlowFilesQueued(managedGroupStatus.getQueuedCount());
+ snapshot.setBytesQueued(managedGroupStatus.getQueuedContentSize());
+
snapshot.setActiveThreadCount(managedGroupStatus.getActiveThreadCount());
+
+ final ProcessingPerformanceStatus performanceStatus =
managedGroupStatus.getProcessingPerformanceStatus();
+ if (performanceStatus != null) {
+
snapshot.setProcessingPerformanceStatus(createProcessingPerformanceStatusDTO(performanceStatus));
+ }
+ }
+
+ // Populate idle status
+ final Optional<Duration> idleDuration =
connectorNode.getIdleDuration();
+ if (idleDuration.isPresent()) {
+ snapshot.setIdle(true);
+ final long idleMillis = idleDuration.get().toMillis();
+ snapshot.setIdleDurationMillis(idleMillis);
+
snapshot.setIdleDuration(FormatUtils.formatHoursMinutesSeconds(idleMillis,
TimeUnit.MILLISECONDS));
+ } else {
+ snapshot.setIdle(false);
+ }
+
+ StatusMerger.updatePrettyPrintedFields(snapshot);
+
+ return statusDto;
+ }
+
private List<ConnectorActionDTO> createConnectorActionDtos(final
ConnectorNode connector) {
return connector.getAvailableActions().stream()
.map(this::createConnectorActionDto)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
index 1e1b965a142..be52686de48 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
@@ -24,6 +24,7 @@ import
org.apache.nifi.web.api.dto.status.ConnectionStatisticsDTO;
import org.apache.nifi.web.api.dto.status.ConnectionStatisticsSnapshotDTO;
import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.ConnectorStatusDTO;
import org.apache.nifi.web.api.dto.status.ControllerServiceStatusDTO;
import org.apache.nifi.web.api.dto.status.FlowAnalysisRuleStatusDTO;
import org.apache.nifi.web.api.dto.status.PortStatusDTO;
@@ -132,12 +133,14 @@ public final class EntityFactory {
public ConnectorEntity createConnectorEntity(final ConnectorDTO dto,
final RevisionDTO revision,
final PermissionsDTO
permissions,
- final PermissionsDTO
operatePermissions) {
+ final PermissionsDTO
operatePermissions,
+ final ConnectorStatusDTO
status) {
final ConnectorEntity entity = new ConnectorEntity();
entity.setRevision(revision);
if (dto != null) {
entity.setPermissions(permissions);
entity.setOperatePermissions(operatePermissions);
+ entity.setStatus(status);
entity.setId(dto.getId());
if (permissions != null && permissions.getCanRead()) {
entity.setComponent(dto);