http://git-wip-us.apache.org/repos/asf/nifi/blob/a901bc65/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusMerger.java index 2eb95f7..cc3c367 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusMerger.java @@ -58,7 +58,33 @@ public class StatusMerger { target.setConnectedNodes(formatCount(target.getConnectedNodeCount()) + " / " + formatCount(target.getTotalNodeCount())); } - public static void merge(final ProcessGroupStatusDTO target, final ProcessGroupStatusDTO toMerge) { + public static List<BulletinDTO> mergeBulletins(final List<BulletinDTO> targetBulletins, final List<BulletinDTO> toMerge) { + final List<BulletinDTO> bulletins = new ArrayList<>(); + if (targetBulletins != null) { + bulletins.addAll(targetBulletins); + } + + if (toMerge != null) { + bulletins.addAll(toMerge); + } + + return bulletins; + } + + + public static void merge(final ProcessGroupStatusDTO target, final ProcessGroupStatusDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) { + merge(target.getAggregateStatus(), toMerge.getAggregateStatus()); + + final NodeProcessGroupStatusSnapshotDTO nodeSnapshot = new NodeProcessGroupStatusSnapshotDTO(); + nodeSnapshot.setStatusSnapshot(toMerge.getAggregateStatus()); + nodeSnapshot.setAddress(nodeAddress); + nodeSnapshot.setApiPort(nodeApiPort); + nodeSnapshot.setNodeId(nodeId); + + target.getNodeStatuses().add(nodeSnapshot); + } + + public static void merge(final ProcessGroupStatusSnapshotDTO target, final ProcessGroupStatusSnapshotDTO toMerge) { if (target == null || toMerge == null) { return; } @@ -85,18 +111,17 @@ public class StatusMerger { target.setFlowFilesSent(target.getFlowFilesSent() + toMerge.getFlowFilesSent()); target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount()); - target.setBulletins(mergeBulletins(target.getBulletins(), toMerge.getBulletins())); updatePrettyPrintedFields(target); // connection status // sort by id - final Map<String, ConnectionStatusDTO> mergedConnectionMap = new HashMap<>(); - for (final ConnectionStatusDTO status : replaceNull(target.getConnectionStatus())) { + final Map<String, ConnectionStatusSnapshotDTO> mergedConnectionMap = new HashMap<>(); + for (final ConnectionStatusSnapshotDTO status : replaceNull(target.getConnectionStatusSnapshots())) { mergedConnectionMap.put(status.getId(), status); } - for (final ConnectionStatusDTO statusToMerge : replaceNull(toMerge.getConnectionStatus())) { - ConnectionStatusDTO merged = mergedConnectionMap.get(statusToMerge.getId()); + for (final ConnectionStatusSnapshotDTO statusToMerge : replaceNull(toMerge.getConnectionStatusSnapshots())) { + ConnectionStatusSnapshotDTO merged = mergedConnectionMap.get(statusToMerge.getId()); if (merged == null) { mergedConnectionMap.put(statusToMerge.getId(), statusToMerge.clone()); continue; @@ -104,16 +129,16 @@ public class StatusMerger { merge(merged, statusToMerge); } - target.setConnectionStatus(mergedConnectionMap.values()); + target.setConnectionStatusSnapshots(mergedConnectionMap.values()); // processor status - final Map<String, ProcessorStatusDTO> mergedProcessorMap = new HashMap<>(); - for (final ProcessorStatusDTO status : replaceNull(target.getProcessorStatus())) { + final Map<String, ProcessorStatusSnapshotDTO> mergedProcessorMap = new HashMap<>(); + for (final ProcessorStatusSnapshotDTO status : replaceNull(target.getProcessorStatusSnapshots())) { mergedProcessorMap.put(status.getId(), status); } - for (final ProcessorStatusDTO statusToMerge : replaceNull(toMerge.getProcessorStatus())) { - ProcessorStatusDTO merged = mergedProcessorMap.get(statusToMerge.getId()); + for (final ProcessorStatusSnapshotDTO statusToMerge : replaceNull(toMerge.getProcessorStatusSnapshots())) { + ProcessorStatusSnapshotDTO merged = mergedProcessorMap.get(statusToMerge.getId()); if (merged == null) { mergedProcessorMap.put(statusToMerge.getId(), statusToMerge.clone()); continue; @@ -121,16 +146,17 @@ public class StatusMerger { merge(merged, statusToMerge); } - target.setProcessorStatus(mergedProcessorMap.values()); + target.setProcessorStatusSnapshots(mergedProcessorMap.values()); + // input ports - final Map<String, PortStatusDTO> mergedInputPortMap = new HashMap<>(); - for (final PortStatusDTO status : replaceNull(target.getInputPortStatus())) { + final Map<String, PortStatusSnapshotDTO> mergedInputPortMap = new HashMap<>(); + for (final PortStatusSnapshotDTO status : replaceNull(target.getInputPortStatusSnapshots())) { mergedInputPortMap.put(status.getId(), status); } - for (final PortStatusDTO statusToMerge : replaceNull(toMerge.getInputPortStatus())) { - PortStatusDTO merged = mergedInputPortMap.get(statusToMerge.getId()); + for (final PortStatusSnapshotDTO statusToMerge : replaceNull(toMerge.getInputPortStatusSnapshots())) { + PortStatusSnapshotDTO merged = mergedInputPortMap.get(statusToMerge.getId()); if (merged == null) { mergedInputPortMap.put(statusToMerge.getId(), statusToMerge.clone()); continue; @@ -138,16 +164,16 @@ public class StatusMerger { merge(merged, statusToMerge); } - target.setInputPortStatus(mergedInputPortMap.values()); + target.setInputPortStatusSnapshots(mergedInputPortMap.values()); // output ports - final Map<String, PortStatusDTO> mergedOutputPortMap = new HashMap<>(); - for (final PortStatusDTO status : replaceNull(target.getOutputPortStatus())) { + final Map<String, PortStatusSnapshotDTO> mergedOutputPortMap = new HashMap<>(); + for (final PortStatusSnapshotDTO status : replaceNull(target.getOutputPortStatusSnapshots())) { mergedOutputPortMap.put(status.getId(), status); } - for (final PortStatusDTO statusToMerge : replaceNull(toMerge.getOutputPortStatus())) { - PortStatusDTO merged = mergedOutputPortMap.get(statusToMerge.getId()); + for (final PortStatusSnapshotDTO statusToMerge : replaceNull(toMerge.getOutputPortStatusSnapshots())) { + PortStatusSnapshotDTO merged = mergedOutputPortMap.get(statusToMerge.getId()); if (merged == null) { mergedOutputPortMap.put(statusToMerge.getId(), statusToMerge.clone()); continue; @@ -155,16 +181,16 @@ public class StatusMerger { merge(merged, statusToMerge); } - target.setOutputPortStatus(mergedOutputPortMap.values()); + target.setOutputPortStatusSnapshots(mergedOutputPortMap.values()); // child groups - final Map<String, ProcessGroupStatusDTO> mergedGroupMap = new HashMap<>(); - for (final ProcessGroupStatusDTO status : replaceNull(target.getProcessGroupStatus())) { + final Map<String, ProcessGroupStatusSnapshotDTO> mergedGroupMap = new HashMap<>(); + for (final ProcessGroupStatusSnapshotDTO status : replaceNull(target.getProcessGroupStatusSnapshots())) { mergedGroupMap.put(status.getId(), status); } - for (final ProcessGroupStatusDTO statusToMerge : replaceNull(toMerge.getProcessGroupStatus())) { - ProcessGroupStatusDTO merged = mergedGroupMap.get(statusToMerge.getId()); + for (final ProcessGroupStatusSnapshotDTO statusToMerge : replaceNull(toMerge.getProcessGroupStatusSnapshots())) { + ProcessGroupStatusSnapshotDTO merged = mergedGroupMap.get(statusToMerge.getId()); if (merged == null) { mergedGroupMap.put(statusToMerge.getId(), statusToMerge.clone()); continue; @@ -172,16 +198,16 @@ public class StatusMerger { merge(merged, statusToMerge); } - target.setOutputPortStatus(mergedOutputPortMap.values()); + target.setOutputPortStatusSnapshots(mergedOutputPortMap.values()); // remote groups - final Map<String, RemoteProcessGroupStatusDTO> mergedRemoteGroupMap = new HashMap<>(); - for (final RemoteProcessGroupStatusDTO status : replaceNull(target.getRemoteProcessGroupStatus())) { + final Map<String, RemoteProcessGroupStatusSnapshotDTO> mergedRemoteGroupMap = new HashMap<>(); + for (final RemoteProcessGroupStatusSnapshotDTO status : replaceNull(target.getRemoteProcessGroupStatusSnapshots())) { mergedRemoteGroupMap.put(status.getId(), status); } - for (final RemoteProcessGroupStatusDTO statusToMerge : replaceNull(toMerge.getRemoteProcessGroupStatus())) { - RemoteProcessGroupStatusDTO merged = mergedRemoteGroupMap.get(statusToMerge.getId()); + for (final RemoteProcessGroupStatusSnapshotDTO statusToMerge : replaceNull(toMerge.getRemoteProcessGroupStatusSnapshots())) { + RemoteProcessGroupStatusSnapshotDTO merged = mergedRemoteGroupMap.get(statusToMerge.getId()); if (merged == null) { mergedRemoteGroupMap.put(statusToMerge.getId(), statusToMerge.clone()); continue; @@ -189,25 +215,13 @@ public class StatusMerger { merge(merged, statusToMerge); } - target.setRemoteProcessGroupStatus(mergedRemoteGroupMap.values()); + target.setRemoteProcessGroupStatusSnapshots(mergedRemoteGroupMap.values()); } private static <T> Collection<T> replaceNull(final Collection<T> collection) { return (collection == null) ? Collections.<T> emptyList() : collection; } - public static List<BulletinDTO> mergeBulletins(final List<BulletinDTO> targetBulletins, final List<BulletinDTO> toMerge) { - final List<BulletinDTO> bulletins = new ArrayList<>(); - if (targetBulletins != null) { - bulletins.addAll(targetBulletins); - } - - if (toMerge != null) { - bulletins.addAll(toMerge); - } - - return bulletins; - } /** * Updates the fields that are "pretty printed" based on the raw values currently set. For example, @@ -221,7 +235,7 @@ public class StatusMerger { * * @param target the DTO to update */ - public static void updatePrettyPrintedFields(final ProcessGroupStatusDTO target) { + public static void updatePrettyPrintedFields(final ProcessGroupStatusSnapshotDTO target) { target.setQueued(prettyPrint(target.getFlowFilesQueued(), target.getBytesQueued())); target.setQueuedCount(formatCount(target.getFlowFilesQueued())); target.setQueuedSize(formatDataSize(target.getBytesQueued())); @@ -235,7 +249,19 @@ public class StatusMerger { } - public static void merge(final ProcessorStatusDTO target, final ProcessorStatusDTO toMerge) { + public static void merge(final ProcessorStatusDTO target, final ProcessorStatusDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) { + merge(target.getAggregateStatus(), toMerge.getAggregateStatus()); + + final NodeProcessorStatusSnapshotDTO nodeSnapshot = new NodeProcessorStatusSnapshotDTO(); + nodeSnapshot.setStatusSnapshot(toMerge.getAggregateStatus()); + nodeSnapshot.setAddress(nodeAddress); + nodeSnapshot.setApiPort(nodeApiPort); + nodeSnapshot.setNodeId(nodeId); + + target.getNodeStatuses().add(nodeSnapshot); + } + + public static void merge(final ProcessorStatusSnapshotDTO target, final ProcessorStatusSnapshotDTO toMerge) { if (target == null || toMerge == null) { return; } @@ -258,11 +284,10 @@ public class StatusMerger { target.setTaskCount(target.getTaskCount() + toMerge.getTaskCount()); target.setTaskDuration(target.getTaskDuration() + toMerge.getTaskDuration()); target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount()); - target.setBulletins(mergeBulletins(target.getBulletins(), toMerge.getBulletins())); updatePrettyPrintedFields(target); } - public static void updatePrettyPrintedFields(final ProcessorStatusDTO target) { + public static void updatePrettyPrintedFields(final ProcessorStatusSnapshotDTO target) { target.setInput(prettyPrint(target.getFlowFilesIn(), target.getBytesIn())); target.setRead(formatDataSize(target.getBytesRead())); target.setWritten(formatDataSize(target.getBytesWritten())); @@ -276,7 +301,7 @@ public class StatusMerger { } - public static void merge(final ConnectionStatusDTO target, final ConnectionStatusDTO toMerge) { + public static void merge(final ConnectionStatusSnapshotDTO target, final ConnectionStatusSnapshotDTO toMerge) { if (target == null || toMerge == null) { return; } @@ -290,7 +315,7 @@ public class StatusMerger { updatePrettyPrintedFields(target); } - public static void updatePrettyPrintedFields(final ConnectionStatusDTO target) { + public static void updatePrettyPrintedFields(final ConnectionStatusSnapshotDTO target) { target.setQueued(prettyPrint(target.getFlowFilesQueued(), target.getBytesQueued())); target.setQueuedCount(formatCount(target.getFlowFilesQueued())); target.setQueuedSize(formatDataSize(target.getBytesQueued())); @@ -300,7 +325,7 @@ public class StatusMerger { - public static void merge(final RemoteProcessGroupStatusDTO target, final RemoteProcessGroupStatusDTO toMerge) { + public static void merge(final RemoteProcessGroupStatusSnapshotDTO target, final RemoteProcessGroupStatusSnapshotDTO toMerge) { final String transmittingValue = TransmissionStatus.Transmitting.name(); if (transmittingValue.equals(target.getTransmissionStatus()) || transmittingValue.equals(toMerge.getTransmissionStatus())) { target.setTransmissionStatus(transmittingValue); @@ -321,18 +346,17 @@ public class StatusMerger { target.setBytesSent(target.getBytesSent() + toMerge.getBytesSent()); target.setFlowFilesReceived(target.getFlowFilesReceived() + toMerge.getFlowFilesReceived()); target.setBytesReceived(target.getBytesReceived() + toMerge.getBytesReceived()); - target.setBulletins(mergeBulletins(target.getBulletins(), toMerge.getBulletins())); updatePrettyPrintedFields(target); } - public static void updatePrettyPrintedFields(final RemoteProcessGroupStatusDTO target) { + public static void updatePrettyPrintedFields(final RemoteProcessGroupStatusSnapshotDTO target) { target.setReceived(prettyPrint(target.getFlowFilesReceived(), target.getBytesReceived())); target.setSent(prettyPrint(target.getFlowFilesSent(), target.getBytesSent())); } - public static void merge(final PortStatusDTO target, final PortStatusDTO toMerge) { + public static void merge(final PortStatusSnapshotDTO target, final PortStatusSnapshotDTO toMerge) { if (target == null || toMerge == null) { return; } @@ -350,11 +374,10 @@ public class StatusMerger { target.setRunStatus(RunStatus.Invalid.name()); } - target.setBulletins(mergeBulletins(target.getBulletins(), toMerge.getBulletins())); updatePrettyPrintedFields(target); } - public static void updatePrettyPrintedFields(final PortStatusDTO target) { + public static void updatePrettyPrintedFields(final PortStatusSnapshotDTO target) { target.setInput(prettyPrint(target.getFlowFilesIn(), target.getBytesIn())); target.setOutput(prettyPrint(target.getFlowFilesOut(), target.getBytesOut())); }
http://git-wip-us.apache.org/repos/asf/nifi/blob/a901bc65/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ClusterConnectionStatusEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ClusterConnectionStatusEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ClusterConnectionStatusEntity.java deleted file mode 100644 index f211cc4..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ClusterConnectionStatusEntity.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.web.api.entity; - -import javax.xml.bind.annotation.XmlRootElement; -import org.apache.nifi.web.api.dto.status.ClusterConnectionStatusDTO; - -/** - * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a ClusterConnectionStatusDTO. - */ -@XmlRootElement(name = "clusterConnectionStatusEntity") -public class ClusterConnectionStatusEntity extends Entity { - - private ClusterConnectionStatusDTO clusterConnectionStatus; - - /** - * The ClusterConnectionStatusDTO that is being serialized. - * - * @return The ClusterConnectionStatusDTO object - */ - public ClusterConnectionStatusDTO getClusterConnectionStatus() { - return clusterConnectionStatus; - } - - public void setClusterConnectionStatus(ClusterConnectionStatusDTO clusterConnectionStatus) { - this.clusterConnectionStatus = clusterConnectionStatus; - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/a901bc65/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ClusterProcessGroupStatusEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ClusterProcessGroupStatusEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ClusterProcessGroupStatusEntity.java deleted file mode 100644 index f8b7e11..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ClusterProcessGroupStatusEntity.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.web.api.entity; - -import javax.xml.bind.annotation.XmlRootElement; -import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO; - -/** - * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a ClusterProcessGroupStatusDTO. - */ -@XmlRootElement(name = "clusterProcessGroupStatusEntity") -public class ClusterProcessGroupStatusEntity extends Entity { - - private ClusterProcessGroupStatusDTO clusterProcessGroupStatus; - - /** - * The ClusterProcessGroupStatusDTO that is being serialized. - * - * @return The ClusterProcessGroupStatusDTO object - */ - public ClusterProcessGroupStatusDTO getClusterProcessGroupStatus() { - return clusterProcessGroupStatus; - } - - public void setClusterProcessGroupStatus(ClusterProcessGroupStatusDTO clusterProcessGroupStatus) { - this.clusterProcessGroupStatus = clusterProcessGroupStatus; - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/a901bc65/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ClusterProcessorStatusEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ClusterProcessorStatusEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ClusterProcessorStatusEntity.java deleted file mode 100644 index 2b8220f..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ClusterProcessorStatusEntity.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.web.api.entity; - -import javax.xml.bind.annotation.XmlRootElement; -import org.apache.nifi.web.api.dto.status.ClusterProcessorStatusDTO; - -/** - * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a ClusterProcessorStatusDTO. - */ -@XmlRootElement(name = "clusterProcessorStatusEntity") -public class ClusterProcessorStatusEntity extends Entity { - - private ClusterProcessorStatusDTO clusterProcessorStatus; - - /** - * The ClusterProcessorStatusDTO that is being serialized. - * - * @return The ClusterProcessorStatusDTO object - */ - public ClusterProcessorStatusDTO getClusterProcessorStatus() { - return clusterProcessorStatus; - } - - public void setClusterProcessorStatus(ClusterProcessorStatusDTO clusterProcessorStatus) { - this.clusterProcessorStatus = clusterProcessorStatus; - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/a901bc65/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ClusterRemoteProcessGroupStatusEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ClusterRemoteProcessGroupStatusEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ClusterRemoteProcessGroupStatusEntity.java deleted file mode 100644 index 66569c5..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ClusterRemoteProcessGroupStatusEntity.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.web.api.entity; - -import javax.xml.bind.annotation.XmlRootElement; -import org.apache.nifi.web.api.dto.status.ClusterRemoteProcessGroupStatusDTO; - -/** - * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a - * ClusterRemoteProcessGroupStatusDTO. - */ -@XmlRootElement(name = "clusterRemoteProcessGroupStatusEntity") -public class ClusterRemoteProcessGroupStatusEntity extends Entity { - - private ClusterRemoteProcessGroupStatusDTO clusterRemoteProcessGroupStatus; - - /** - * The ClusterRemoteProcessGroupStatusDTO that is being serialized. - * - * @return The ClusterRemoteProcessGroupStatusDTO object - */ - public ClusterRemoteProcessGroupStatusDTO getClusterRemoteProcessGroupStatus() { - return clusterRemoteProcessGroupStatus; - } - - public void setClusterRemoteProcessGroupStatus(ClusterRemoteProcessGroupStatusDTO clusterRemoteProcessGroupStatus) { - this.clusterRemoteProcessGroupStatus = clusterRemoteProcessGroupStatus; - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/a901bc65/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index 3cd8b05..297d43f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -224,8 +224,8 @@ import org.apache.nifi.web.api.dto.provenance.ProvenanceResultsDTO; import org.apache.nifi.web.api.dto.status.ControllerStatusDTO; import org.apache.nifi.web.api.dto.status.NodeStatusSnapshotsDTO; import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO; -import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO; -import org.apache.nifi.web.api.dto.status.StatusDTO; +import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO; +import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusSnapshotDTO; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; import org.apache.nifi.web.api.dto.status.StatusMerger; import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO; @@ -2617,52 +2617,19 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C componentState.getLocalState().setState(localStateEntries); } - @SuppressWarnings("unchecked") - private void updateBulletins(final String nodeAddress, final Collection<? extends StatusDTO>... dtos) { - for (final Collection<? extends StatusDTO> collection : dtos) { - if (collection != null) { - for (final StatusDTO dto : collection) { - final List<BulletinDTO> bulletins = dto.getBulletins(); - if (bulletins != null) { - for (final BulletinDTO bulletin : bulletins) { - bulletin.setNodeAddress(nodeAddress); - } - } - } - } - } - } - - @SuppressWarnings("unchecked") - private void updateBulletins(final ProcessGroupStatusDTO dto, final String nodeAddress) { - for (final BulletinDTO bulletin : dto.getBulletins()) { - bulletin.setNodeAddress(nodeAddress); - } - updateBulletins(nodeAddress, dto.getProcessorStatus(), dto.getInputPortStatus(), dto.getOutputPortStatus(), dto.getRemoteProcessGroupStatus()); - - if (dto.getProcessGroupStatus() != null) { - for (final ProcessGroupStatusDTO childGroup : dto.getProcessGroupStatus()) { - updateBulletins(childGroup, nodeAddress); - } - } - } private void mergeGroupStatus(final ProcessGroupStatusDTO statusDto, final Map<NodeIdentifier, ProcessGroupStatusDTO> resultMap) { ProcessGroupStatusDTO mergedProcessGroupStatus = statusDto; for (final Map.Entry<NodeIdentifier, ProcessGroupStatusDTO> entry : resultMap.entrySet()) { final NodeIdentifier nodeId = entry.getKey(); final ProcessGroupStatusDTO nodeProcessGroupStatus = entry.getValue(); - - final String nodeAddress = nodeId.getApiAddress() + ":" + nodeId.getApiPort(); - updateBulletins(mergedProcessGroupStatus, nodeAddress); - if (nodeProcessGroupStatus == mergedProcessGroupStatus) { continue; } - final ProcessGroupStatusDTO nodeClone = nodeProcessGroupStatus.clone(); - for (final RemoteProcessGroupStatusDTO remoteProcessGroupStatus : nodeClone.getRemoteProcessGroupStatus()) { + final ProcessGroupStatusSnapshotDTO nodeSnapshot = nodeProcessGroupStatus.getAggregateStatus(); + for (final RemoteProcessGroupStatusSnapshotDTO remoteProcessGroupStatus : nodeSnapshot.getRemoteProcessGroupStatusSnapshots()) { final List<String> nodeAuthorizationIssues = remoteProcessGroupStatus.getAuthorizationIssues(); if (!nodeAuthorizationIssues.isEmpty()) { for (final ListIterator<String> iter = nodeAuthorizationIssues.listIterator(); iter.hasNext();) { @@ -2673,10 +2640,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } } - StatusMerger.merge(mergedProcessGroupStatus, nodeClone); + StatusMerger.merge(mergedProcessGroupStatus, nodeProcessGroupStatus, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); } - - StatusMerger.updatePrettyPrintedFields(mergedProcessGroupStatus); } http://git-wip-us.apache.org/repos/asf/nifi/blob/a901bc65/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index a290fec..c6d8619 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -16,14 +16,18 @@ */ package org.apache.nifi.web; +import java.util.Collection; +import java.util.Date; +import java.util.Set; + import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.repository.claim.ContentDirection; import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.web.api.dto.BulletinBoardDTO; import org.apache.nifi.web.api.dto.BulletinQueryDTO; import org.apache.nifi.web.api.dto.ClusterDTO; -import org.apache.nifi.web.api.dto.ComponentStateDTO; import org.apache.nifi.web.api.dto.ComponentHistoryDTO; +import org.apache.nifi.web.api.dto.ComponentStateDTO; import org.apache.nifi.web.api.dto.ConnectionDTO; import org.apache.nifi.web.api.dto.ControllerConfigurationDTO; import org.apache.nifi.web.api.dto.ControllerDTO; @@ -61,21 +65,13 @@ import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceOptionsDTO; import org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO; import org.apache.nifi.web.api.dto.search.SearchResultsDTO; -import org.apache.nifi.web.api.dto.status.ClusterConnectionStatusDTO; import org.apache.nifi.web.api.dto.status.ClusterPortStatusDTO; -import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO; -import org.apache.nifi.web.api.dto.status.ClusterProcessorStatusDTO; -import org.apache.nifi.web.api.dto.status.ClusterRemoteProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.ClusterStatusDTO; import org.apache.nifi.web.api.dto.status.ControllerStatusDTO; import org.apache.nifi.web.api.dto.status.NodeStatusDTO; import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; -import java.util.Collection; -import java.util.Date; -import java.util.Set; - /** * Defines the NiFiServiceFacade interface. */ @@ -1530,41 +1526,6 @@ public interface NiFiServiceFacade { ClusterStatusDTO getClusterStatus(); /** - * Returns a processor's status for each node connected to the cluster. - * - * @param processorId a processor identifier - * @return The cluster processor status transfer object. - */ - ClusterProcessorStatusDTO getClusterProcessorStatus(String processorId); - - - /** - * Returns a connection's status for each node connected to the cluster. - * - * @param connectionId a connection identifier - * @return The cluster connection status transfer object. - */ - ClusterConnectionStatusDTO getClusterConnectionStatus(String connectionId); - - - /** - * Returns a process group's status for each node connected to the cluster. - * - * @param processorId a process group identifier - * @return The cluster process group status transfer object. - */ - ClusterProcessGroupStatusDTO getClusterProcessGroupStatus(String processorId); - - - /** - * Returns a remote process group's status for each node connected to the cluster. - * - * @param remoteProcessGroupId a remote process group identifier - * @return The cluster remote process group status transfer object. - */ - ClusterRemoteProcessGroupStatusDTO getClusterRemoteProcessGroupStatus(String remoteProcessGroupId); - - /** * Returns an input port's status for each node connected to the cluster. * * @param inputPortId a port identifier http://git-wip-us.apache.org/repos/asf/nifi/blob/a901bc65/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 098fd64..05fe7e8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -74,10 +74,8 @@ import org.apache.nifi.controller.repository.claim.ContentDirection; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceReference; import org.apache.nifi.controller.service.ControllerServiceState; -import org.apache.nifi.controller.status.ConnectionStatus; import org.apache.nifi.controller.status.PortStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; -import org.apache.nifi.controller.status.ProcessorStatus; import org.apache.nifi.controller.status.RemoteProcessGroupStatus; import org.apache.nifi.diagnostics.SystemDiagnostics; import org.apache.nifi.groups.ProcessGroup; @@ -143,18 +141,10 @@ import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceOptionsDTO; import org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO; import org.apache.nifi.web.api.dto.search.SearchResultsDTO; -import org.apache.nifi.web.api.dto.status.ClusterConnectionStatusDTO; import org.apache.nifi.web.api.dto.status.ClusterPortStatusDTO; -import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO; -import org.apache.nifi.web.api.dto.status.ClusterProcessorStatusDTO; -import org.apache.nifi.web.api.dto.status.ClusterRemoteProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.ClusterStatusDTO; import org.apache.nifi.web.api.dto.status.ControllerStatusDTO; -import org.apache.nifi.web.api.dto.status.NodeConnectionStatusDTO; import org.apache.nifi.web.api.dto.status.NodePortStatusDTO; -import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusDTO; -import org.apache.nifi.web.api.dto.status.NodeProcessorStatusDTO; -import org.apache.nifi.web.api.dto.status.NodeRemoteProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.NodeStatusDTO; import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; @@ -2868,227 +2858,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { clusterManager.deleteNode(nodeId, userDn); } - private ProcessorStatus findNodeProcessorStatus(final ProcessGroupStatus groupStatus, final String processorId) { - ProcessorStatus processorStatus = null; - - for (final ProcessorStatus status : groupStatus.getProcessorStatus()) { - if (processorId.equals(status.getId())) { - processorStatus = status; - break; - } - } - - if (processorStatus == null) { - for (final ProcessGroupStatus status : groupStatus.getProcessGroupStatus()) { - processorStatus = findNodeProcessorStatus(status, processorId); - - if (processorStatus != null) { - break; - } - } - } - - return processorStatus; - } - - // TODO Refactor!!! - @Override - public ClusterProcessorStatusDTO getClusterProcessorStatus(String processorId) { - - final ClusterProcessorStatusDTO clusterProcessorStatusDto = new ClusterProcessorStatusDTO(); - clusterProcessorStatusDto.setNodeProcessorStatus(new ArrayList<NodeProcessorStatusDTO>()); - - // set the current time - clusterProcessorStatusDto.setStatsLastRefreshed(new Date()); - - final Set<Node> nodes = clusterManager.getNodes(Node.Status.CONNECTED); - boolean firstNode = true; - for (final Node node : nodes) { - - final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload(); - if (nodeHeartbeatPayload == null) { - continue; - } - - final ProcessGroupStatus nodeStats = nodeHeartbeatPayload.getProcessGroupStatus(); - if (nodeStats == null || nodeStats.getProcessorStatus() == null) { - continue; - } - - // attempt to find the processor stats for this node - final ProcessorStatus processorStatus = findNodeProcessorStatus(nodeStats, processorId); - - // sanity check that we have status for this processor - if (processorStatus == null) { - throw new ResourceNotFoundException(String.format("Unable to find status for processor id '%s'.", processorId)); - } - - if (firstNode) { - clusterProcessorStatusDto.setProcessorId(processorId); - clusterProcessorStatusDto.setProcessorName(processorStatus.getName()); - clusterProcessorStatusDto.setProcessorType(processorStatus.getType()); - clusterProcessorStatusDto.setProcessorRunStatus(processorStatus.getRunStatus().toString()); - firstNode = false; - } - - // create node processor status dto - final NodeProcessorStatusDTO nodeProcessorStatusDTO = new NodeProcessorStatusDTO(); - clusterProcessorStatusDto.getNodeProcessorStatus().add(nodeProcessorStatusDTO); - - // populate node processor status dto - final String nodeId = node.getNodeId().getId(); - nodeProcessorStatusDTO.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId))); - nodeProcessorStatusDTO.setProcessorStatus(dtoFactory.createProcessorStatusDto(processorStatus)); - - } - - return clusterProcessorStatusDto; - } - - private ConnectionStatus findNodeConnectionStatus(final ProcessGroupStatus groupStatus, final String connectionId) { - ConnectionStatus connectionStatus = null; - - for (final ConnectionStatus status : groupStatus.getConnectionStatus()) { - if (connectionId.equals(status.getId())) { - connectionStatus = status; - break; - } - } - - if (connectionStatus == null) { - for (final ProcessGroupStatus status : groupStatus.getProcessGroupStatus()) { - connectionStatus = findNodeConnectionStatus(status, connectionId); - - if (connectionStatus != null) { - break; - } - } - } - - return connectionStatus; - } - - @Override - public ClusterConnectionStatusDTO getClusterConnectionStatus(String connectionId) { - final ClusterConnectionStatusDTO clusterConnectionStatusDto = new ClusterConnectionStatusDTO(); - clusterConnectionStatusDto.setNodeConnectionStatus(new ArrayList<NodeConnectionStatusDTO>()); - - // set the current time - clusterConnectionStatusDto.setStatsLastRefreshed(new Date()); - - final Set<Node> nodes = clusterManager.getNodes(Node.Status.CONNECTED); - boolean firstNode = true; - for (final Node node : nodes) { - - final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload(); - if (nodeHeartbeatPayload == null) { - continue; - } - - final ProcessGroupStatus nodeStats = nodeHeartbeatPayload.getProcessGroupStatus(); - if (nodeStats == null || nodeStats.getProcessorStatus() == null) { - continue; - } - - // find the connection status for this node - final ConnectionStatus connectionStatus = findNodeConnectionStatus(nodeStats, connectionId); - - // sanity check that we have status for this connection - if (connectionStatus == null) { - throw new ResourceNotFoundException(String.format("Unable to find status for connection id '%s'.", connectionId)); - } - - if (firstNode) { - clusterConnectionStatusDto.setConnectionId(connectionId); - clusterConnectionStatusDto.setConnectionName(connectionStatus.getName()); - firstNode = false; - } - - // create node connection status dto - final NodeConnectionStatusDTO nodeConnectionStatusDTO = new NodeConnectionStatusDTO(); - clusterConnectionStatusDto.getNodeConnectionStatus().add(nodeConnectionStatusDTO); - - // populate node processor status dto - final String nodeId = node.getNodeId().getId(); - nodeConnectionStatusDTO.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId))); - nodeConnectionStatusDTO.setConnectionStatus(dtoFactory.createConnectionStatusDto(connectionStatus)); - - } - - return clusterConnectionStatusDto; - } - - private ProcessGroupStatus findNodeProcessGroupStatus(final ProcessGroupStatus groupStatus, final String processGroupId) { - ProcessGroupStatus processGroupStatus = null; - - if (processGroupId.equals(groupStatus.getId())) { - processGroupStatus = groupStatus; - } - - if (processGroupStatus == null) { - for (final ProcessGroupStatus status : groupStatus.getProcessGroupStatus()) { - processGroupStatus = findNodeProcessGroupStatus(status, processGroupId); - - if (processGroupStatus != null) { - break; - } - } - } - - return processGroupStatus; - } - - @Override - public ClusterProcessGroupStatusDTO getClusterProcessGroupStatus(String processGroupId) { - - final ClusterProcessGroupStatusDTO clusterProcessGroupStatusDto = new ClusterProcessGroupStatusDTO(); - clusterProcessGroupStatusDto.setNodeProcessGroupStatus(new ArrayList<NodeProcessGroupStatusDTO>()); - - // set the current time - clusterProcessGroupStatusDto.setStatsLastRefreshed(new Date()); - - final Set<Node> nodes = clusterManager.getNodes(Node.Status.CONNECTED); - boolean firstNode = true; - for (final Node node : nodes) { - - final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload(); - if (nodeHeartbeatPayload == null) { - continue; - } - - final ProcessGroupStatus nodeStats = nodeHeartbeatPayload.getProcessGroupStatus(); - if (nodeStats == null || nodeStats.getProcessorStatus() == null) { - continue; - } - - // attempt to find the process group stats for this node - final ProcessGroupStatus processGroupStatus = findNodeProcessGroupStatus(nodeStats, processGroupId); - - // sanity check that we have status for this process group - if (processGroupStatus == null) { - throw new ResourceNotFoundException(String.format("Unable to find status for process group id '%s'.", processGroupId)); - } - - if (firstNode) { - clusterProcessGroupStatusDto.setProcessGroupId(processGroupId); - clusterProcessGroupStatusDto.setProcessGroupName(processGroupStatus.getName()); - firstNode = false; - } - - // create node process group status dto - final NodeProcessGroupStatusDTO nodeProcessGroupStatusDTO = new NodeProcessGroupStatusDTO(); - clusterProcessGroupStatusDto.getNodeProcessGroupStatus().add(nodeProcessGroupStatusDTO); - - // populate node process group status dto - final String nodeId = node.getNodeId().getId(); - nodeProcessGroupStatusDTO.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId))); - nodeProcessGroupStatusDTO.setProcessGroupStatus(dtoFactory.createProcessGroupStatusDto(clusterManager.getBulletinRepository(), processGroupStatus)); - - } - - return clusterProcessGroupStatusDto; - } - private PortStatus findNodeInputPortStatus(final ProcessGroupStatus groupStatus, final String inputPortId) { PortStatus portStatus = null; @@ -3256,54 +3025,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return remoteProcessGroupStatus; } - @Override - public ClusterRemoteProcessGroupStatusDTO getClusterRemoteProcessGroupStatus(String remoteProcessGroupId) { - final ClusterRemoteProcessGroupStatusDTO clusterRemoteProcessGroupStatusDto = new ClusterRemoteProcessGroupStatusDTO(); - clusterRemoteProcessGroupStatusDto.setNodeRemoteProcessGroupStatus(new ArrayList<NodeRemoteProcessGroupStatusDTO>()); - - // set the current time - clusterRemoteProcessGroupStatusDto.setStatsLastRefreshed(new Date()); - - final Set<Node> nodes = clusterManager.getNodes(Node.Status.CONNECTED); - boolean firstNode = true; - for (final Node node : nodes) { - - final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload(); - if (nodeHeartbeatPayload == null) { - continue; - } - - final ProcessGroupStatus nodeStats = nodeHeartbeatPayload.getProcessGroupStatus(); - if (nodeStats == null || nodeStats.getProcessorStatus() == null) { - continue; - } - - // find the remote process group for this node - final RemoteProcessGroupStatus remoteProcessGroupStatus = findNodeRemoteProcessGroupStatus(nodeStats, remoteProcessGroupId); - - // sanity check that we have status for this remote process group - if (remoteProcessGroupStatus == null) { - throw new ResourceNotFoundException(String.format("Unable to find status for remote process group id '%s'.", remoteProcessGroupId)); - } - - if (firstNode) { - clusterRemoteProcessGroupStatusDto.setRemoteProcessGroupId(remoteProcessGroupId); - clusterRemoteProcessGroupStatusDto.setRemoteProcessGroupName(remoteProcessGroupStatus.getName()); - firstNode = false; - } - - // create node remote process group status dto - final NodeRemoteProcessGroupStatusDTO nodeRemoteProcessGroupStatusDTO = new NodeRemoteProcessGroupStatusDTO(); - clusterRemoteProcessGroupStatusDto.getNodeRemoteProcessGroupStatus().add(nodeRemoteProcessGroupStatusDTO); - - // populate node remote process group status dto - final String nodeId = node.getNodeId().getId(); - nodeRemoteProcessGroupStatusDTO.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId))); - nodeRemoteProcessGroupStatusDTO.setRemoteProcessGroupStatus(dtoFactory.createRemoteProcessGroupStatusDto(remoteProcessGroupStatus)); - } - - return clusterRemoteProcessGroupStatusDto; - } @Override @@ -3565,91 +3286,4 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return date1; } } - - /** - * Utility method for extracting component counts from the specified group status. - */ - private ProcessGroupCounts extractProcessGroupCounts(ProcessGroupStatus groupStatus) { - int running = 0; - int stopped = 0; - int invalid = 0; - int disabled = 0; - int activeRemotePorts = 0; - int inactiveRemotePorts = 0; - - for (final ProcessorStatus processorStatus : groupStatus.getProcessorStatus()) { - switch (processorStatus.getRunStatus()) { - case Disabled: - disabled++; - break; - case Running: - running++; - break; - case Invalid: - invalid++; - break; - default: - stopped++; - break; - } - } - - for (final PortStatus portStatus : groupStatus.getInputPortStatus()) { - switch (portStatus.getRunStatus()) { - case Disabled: - disabled++; - break; - case Running: - running++; - break; - case Invalid: - invalid++; - break; - default: - stopped++; - break; - } - } - - for (final PortStatus portStatus : groupStatus.getOutputPortStatus()) { - switch (portStatus.getRunStatus()) { - case Disabled: - disabled++; - break; - case Running: - running++; - break; - case Invalid: - invalid++; - break; - default: - stopped++; - break; - } - } - - for (final RemoteProcessGroupStatus remoteStatus : groupStatus.getRemoteProcessGroupStatus()) { - if (remoteStatus.getActiveRemotePortCount() != null) { - activeRemotePorts += remoteStatus.getActiveRemotePortCount(); - } - if (remoteStatus.getInactiveRemotePortCount() != null) { - inactiveRemotePorts += remoteStatus.getInactiveRemotePortCount(); - } - if (CollectionUtils.isNotEmpty(remoteStatus.getAuthorizationIssues())) { - invalid++; - } - } - - for (final ProcessGroupStatus childGroupStatus : groupStatus.getProcessGroupStatus()) { - final ProcessGroupCounts childCounts = extractProcessGroupCounts(childGroupStatus); - running += childCounts.getRunningCount(); - stopped += childCounts.getStoppedCount(); - invalid += childCounts.getInvalidCount(); - disabled += childCounts.getDisabledCount(); - activeRemotePorts += childCounts.getActiveRemotePortCount(); - inactiveRemotePorts += childCounts.getInactiveRemotePortCount(); - } - - return new ProcessGroupCounts(0, 0, running, stopped, invalid, disabled, activeRemotePorts, inactiveRemotePorts); - } } http://git-wip-us.apache.org/repos/asf/nifi/blob/a901bc65/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java index 3086ab4..a989620 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java @@ -51,18 +51,10 @@ import org.apache.nifi.web.api.dto.ProcessorConfigDTO; import org.apache.nifi.web.api.dto.ProcessorDTO; import org.apache.nifi.web.api.dto.RevisionDTO; import org.apache.nifi.web.api.dto.search.NodeSearchResultDTO; -import org.apache.nifi.web.api.dto.status.ClusterConnectionStatusDTO; import org.apache.nifi.web.api.dto.status.ClusterPortStatusDTO; -import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO; -import org.apache.nifi.web.api.dto.status.ClusterProcessorStatusDTO; -import org.apache.nifi.web.api.dto.status.ClusterRemoteProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.ClusterStatusDTO; -import org.apache.nifi.web.api.entity.ClusterConnectionStatusEntity; import org.apache.nifi.web.api.entity.ClusterEntity; import org.apache.nifi.web.api.entity.ClusterPortStatusEntity; -import org.apache.nifi.web.api.entity.ClusterProcessGroupStatusEntity; -import org.apache.nifi.web.api.entity.ClusterProcessorStatusEntity; -import org.apache.nifi.web.api.entity.ClusterRemoteProcessGroupStatusEntity; import org.apache.nifi.web.api.entity.ClusterSearchResultsEntity; import org.apache.nifi.web.api.entity.ClusterStatusEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; @@ -521,67 +513,6 @@ public class ClusterResource extends ApplicationResource { throw new IllegalClusterResourceRequestException("Only a node can process the request."); } - /** - * Gets the processor status for every node. - * - * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. - * @param id The id of the processor - * @return A clusterProcessorStatusEntity - */ - @GET - @Consumes(MediaType.WILDCARD) - @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON}) - @Path("/processors/{id}/status") - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") - @ApiOperation( - value = "Gets the processor status across the cluster", - response = ClusterProcessorStatusEntity.class, - authorizations = { - @Authorization(value = "Read Only", type = "ROLE_MONITOR"), - @Authorization(value = "DFM", type = "ROLE_DFM"), - @Authorization(value = "Admin", type = "ROLE_ADMIN") - } - ) - @ApiResponses( - value = { - @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), - @ApiResponse(code = 401, message = "Client could not be authenticated."), - @ApiResponse(code = 403, message = "Client is not authorized to make this request."), - @ApiResponse(code = 404, message = "The specified resource could not be found."), - @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") - } - ) - public Response getProcessorStatus( - @ApiParam( - value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", - required = false - ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, - @ApiParam( - value = "The processor id", - required = true - ) - @PathParam("id") String id) { - - if (properties.isClusterManager()) { - - final ClusterProcessorStatusDTO dto = serviceFacade.getClusterProcessorStatus(id); - - // create the revision - RevisionDTO revision = new RevisionDTO(); - revision.setClientId(clientId.getClientId()); - - // create entity - final ClusterProcessorStatusEntity entity = new ClusterProcessorStatusEntity(); - entity.setClusterProcessorStatus(dto); - entity.setRevision(revision); - - // generate the response - return generateOkResponse(entity).build(); - } - - throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request."); - } /** * Gets the processor status history for every node. @@ -641,192 +572,8 @@ public class ClusterResource extends ApplicationResource { throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request."); } - /** - * Gets the connection status for every node. - * - * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. - * @param id The id of the processor - * @return A clusterProcessorStatusEntity - */ - @GET - @Consumes(MediaType.WILDCARD) - @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON}) - @Path("/connections/{id}/status") - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") - @ApiOperation( - value = "Gets connection status across the cluster", - response = ClusterConnectionStatusEntity.class, - authorizations = { - @Authorization(value = "Read Only", type = "ROLE_MONITOR"), - @Authorization(value = "DFM", type = "ROLE_DFM"), - @Authorization(value = "Admin", type = "ROLE_ADMIN") - } - ) - @ApiResponses( - value = { - @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), - @ApiResponse(code = 401, message = "Client could not be authenticated."), - @ApiResponse(code = 403, message = "Client is not authorized to make this request."), - @ApiResponse(code = 404, message = "The specified resource could not be found."), - @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") - } - ) - public Response getConnectionStatus( - @ApiParam( - value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", - required = false - ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, - @ApiParam( - value = "The connection id", - required = true - ) - @PathParam("id") String id) { - - if (properties.isClusterManager()) { - - final ClusterConnectionStatusDTO dto = serviceFacade.getClusterConnectionStatus(id); - - // create the revision - RevisionDTO revision = new RevisionDTO(); - revision.setClientId(clientId.getClientId()); - - // create entity - final ClusterConnectionStatusEntity entity = new ClusterConnectionStatusEntity(); - entity.setClusterConnectionStatus(dto); - entity.setRevision(revision); - - // generate the response - return generateOkResponse(entity).build(); - } - - throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request."); - } - - /** - * Gets the process group status for every node. - * - * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. - * @param id The id of the process group - * @return A clusterProcessGroupStatusEntity - */ - @GET - @Consumes(MediaType.WILDCARD) - @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON}) - @Path("/process-groups/{id}/status") - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") - @ApiOperation( - value = "Gets process group status across the cluster", - response = ClusterProcessGroupStatusEntity.class, - authorizations = { - @Authorization(value = "Read Only", type = "ROLE_MONITOR"), - @Authorization(value = "DFM", type = "ROLE_DFM"), - @Authorization(value = "Admin", type = "ROLE_ADMIN") - } - ) - @ApiResponses( - value = { - @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), - @ApiResponse(code = 401, message = "Client could not be authenticated."), - @ApiResponse(code = 403, message = "Client is not authorized to make this request."), - @ApiResponse(code = 404, message = "The specified resource could not be found."), - @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") - } - ) - public Response getProcessGroupStatus( - @ApiParam( - value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", - required = false - ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, - @ApiParam( - value = "The process group id.", - required = true - ) - @PathParam("id") String id) { - - if (properties.isClusterManager()) { - - final ClusterProcessGroupStatusDTO dto = serviceFacade.getClusterProcessGroupStatus(id); - - // create the revision - RevisionDTO revision = new RevisionDTO(); - revision.setClientId(clientId.getClientId()); - - // create entity - final ClusterProcessGroupStatusEntity entity = new ClusterProcessGroupStatusEntity(); - entity.setClusterProcessGroupStatus(dto); - entity.setRevision(revision); - - // generate the response - return generateOkResponse(entity).build(); - } - - throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request."); - } - - /** - * Gets the remote process group status for every node. - * - * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. - * @param id The id of the remote process group - * @return A clusterRemoteProcessGroupStatusEntity - */ - @GET - @Consumes(MediaType.WILDCARD) - @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON}) - @Path("/remote-process-groups/{id}/status") - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") - @ApiOperation( - value = "Gets remote process group status across the cluster", - response = ClusterRemoteProcessGroupStatusEntity.class, - authorizations = { - @Authorization(value = "Read Only", type = "ROLE_MONITOR"), - @Authorization(value = "DFM", type = "ROLE_DFM"), - @Authorization(value = "Admin", type = "ROLE_ADMIN") - } - ) - @ApiResponses( - value = { - @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), - @ApiResponse(code = 401, message = "Client could not be authenticated."), - @ApiResponse(code = 403, message = "Client is not authorized to make this request."), - @ApiResponse(code = 404, message = "The specified resource could not be found."), - @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") - } - ) - public Response getRemoteProcessGroupStatus( - @ApiParam( - value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", - required = false - ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, - @ApiParam( - value = "The remote process group id.", - required = true - ) - @PathParam("id") String id) { - - if (properties.isClusterManager()) { - - final ClusterRemoteProcessGroupStatusDTO dto = serviceFacade.getClusterRemoteProcessGroupStatus(id); - // create the revision - RevisionDTO revision = new RevisionDTO(); - revision.setClientId(clientId.getClientId()); - - // create entity - final ClusterRemoteProcessGroupStatusEntity entity = new ClusterRemoteProcessGroupStatusEntity(); - entity.setClusterRemoteProcessGroupStatus(dto); - entity.setRevision(revision); - - // generate the response - return generateOkResponse(entity).build(); - } - - throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request."); - } /** * Gets the input port status for every node. http://git-wip-us.apache.org/repos/asf/nifi/blob/a901bc65/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java index b5fadcc..cd4bd73 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java @@ -51,7 +51,9 @@ import org.apache.nifi.web.api.dto.FlowSnippetDTO; import org.apache.nifi.web.api.dto.PositionDTO; import org.apache.nifi.web.api.dto.ProcessGroupDTO; import org.apache.nifi.web.api.dto.RevisionDTO; +import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO; import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO; +import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; import org.apache.nifi.web.api.entity.FlowSnippetEntity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; @@ -1294,13 +1296,9 @@ public class ProcessGroupResource extends ApplicationResource { // prune the response as necessary if (!recursive) { - for (final ProcessGroupStatusDTO childProcessGroupStatus : statusReport.getProcessGroupStatus()) { - childProcessGroupStatus.setConnectionStatus(null); - childProcessGroupStatus.setProcessGroupStatus(null); - childProcessGroupStatus.setInputPortStatus(null); - childProcessGroupStatus.setOutputPortStatus(null); - childProcessGroupStatus.setProcessorStatus(null); - childProcessGroupStatus.setRemoteProcessGroupStatus(null); + prune(statusReport.getAggregateStatus()); + for (final NodeProcessGroupStatusSnapshotDTO nodeSnapshot : statusReport.getNodeStatuses()) { + prune(nodeSnapshot.getStatusSnapshot()); } } @@ -1317,6 +1315,15 @@ public class ProcessGroupResource extends ApplicationResource { return clusterContext(generateOkResponse(entity)).build(); } + private void prune(final ProcessGroupStatusSnapshotDTO snapshot) { + snapshot.setConnectionStatusSnapshots(null); + snapshot.setProcessGroupStatusSnapshots(null); + snapshot.setInputPortStatusSnapshots(null); + snapshot.setOutputPortStatusSnapshots(null); + snapshot.setProcessorStatusSnapshots(null); + snapshot.setRemoteProcessGroupStatusSnapshots(null); + } + /** * Retrieves the specified remote process groups status history. *
