Repository: nifi Updated Branches: refs/heads/NIFI-1563 e49636af5 -> c335d810f
NIFI-1563: Replicate request for counters instead of pulling them from heartbeats Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c335d810 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c335d810 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c335d810 Branch: refs/heads/NIFI-1563 Commit: c335d810fb3dd05c80692caafede0a29621af357 Parents: e49636a Author: Mark Payne <[email protected]> Authored: Thu Mar 17 14:36:31 2016 -0400 Committer: Mark Payne <[email protected]> Committed: Thu Mar 17 14:36:31 2016 -0400 ---------------------------------------------------------------------- .../org/apache/nifi/web/api/dto/CounterDTO.java | 10 ++- .../apache/nifi/web/api/dto/CountersDTO.java | 51 +++++-------- .../nifi/web/api/dto/CountersSnapshotDTO.java | 73 ++++++++++++++++++ .../web/api/dto/NodeCountersSnapshotDTO.java | 78 ++++++++++++++++++++ .../nifi/web/api/dto/status/StatusMerger.java | 47 ++++++++++++ .../cluster/manager/impl/WebClusterManager.java | 45 +++++++++-- .../nifi/web/StandardNiFiServiceFacade.java | 56 +++----------- .../apache/nifi/web/api/ControllerResource.java | 5 ++ .../org/apache/nifi/web/api/dto/DtoFactory.java | 4 +- 9 files changed, 281 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/c335d810/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/CounterDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/CounterDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/CounterDTO.java index 615ad93..79a64ce 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/CounterDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/CounterDTO.java @@ -23,7 +23,7 @@ import javax.xml.bind.annotation.XmlType; * Counter value for a specific component in a specific context. A counter is a value that a component can adjust during processing. */ @XmlType(name = "counter") -public class CounterDTO { +public class CounterDTO implements Cloneable { private String id; private String context; @@ -98,4 +98,12 @@ public class CounterDTO { this.valueCount = valueCount; } + @Override + public CounterDTO clone() { + try { + return (CounterDTO) super.clone(); + } catch (final CloneNotSupportedException cnse) { + throw new AssertionError(cnse); + } + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/c335d810/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/CountersDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/CountersDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/CountersDTO.java index 0f162c9..bd9c115 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/CountersDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/CountersDTO.java @@ -14,51 +14,34 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.nifi.web.api.dto; -import com.wordnik.swagger.annotations.ApiModelProperty; -import java.util.Collection; -import java.util.Date; +import java.util.List; -import javax.xml.bind.annotation.XmlType; -import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; -import org.apache.nifi.web.api.dto.util.TimeAdapter; +import com.wordnik.swagger.annotations.ApiModelProperty; -/** - * All the counters in this NiFi instance at a given time. - */ -@XmlType(name = "counters") public class CountersDTO { + private CountersSnapshotDTO aggregateSnapshot; + private List<NodeCountersSnapshotDTO> nodeSnapshots; - private Date generated; - private Collection<CounterDTO> counters; - - /** - * @return the collection of counters - */ - @ApiModelProperty( - value = "All counters in the NiFi." - ) - public Collection<CounterDTO> getCounters() { - return counters; + @ApiModelProperty("A Counters snapshot that represents the aggregate values of all nodes in the cluster. If the NiFi instance is " + + "a standalone instance, rather than a cluster, this represents the stats of the single instance.") + public CountersSnapshotDTO getAggregateSnapshot() { + return aggregateSnapshot; } - public void setCounters(Collection<CounterDTO> counters) { - this.counters = counters; + public void setAggregateSnapshot(CountersSnapshotDTO aggregateSnapshot) { + this.aggregateSnapshot = aggregateSnapshot; } - /** - * @return the date/time that this report was generated - */ - @XmlJavaTypeAdapter(TimeAdapter.class) - @ApiModelProperty( - value = "The timestamp when the report was generated." - ) - public Date getGenerated() { - return generated; + @ApiModelProperty("A Counters snapshot for each node in the cluster. If the NiFi instance is a standalone instance, rather than " + + "a cluster, this may be null.") + public List<NodeCountersSnapshotDTO> getNodeSnapshots() { + return nodeSnapshots; } - public void setGenerated(Date generated) { - this.generated = generated; + public void setNodeSnapshots(List<NodeCountersSnapshotDTO> nodeSnapshots) { + this.nodeSnapshots = nodeSnapshots; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/c335d810/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/CountersSnapshotDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/CountersSnapshotDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/CountersSnapshotDTO.java new file mode 100644 index 0000000..1e1b389 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/CountersSnapshotDTO.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.dto; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.List; + +import javax.xml.bind.annotation.XmlType; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; + +import org.apache.nifi.web.api.dto.util.TimeAdapter; + +import com.wordnik.swagger.annotations.ApiModelProperty; + + +/** + * All the counters in this NiFi instance at a given time. + */ +@XmlType(name = "countersSnapshot") +public class CountersSnapshotDTO implements Cloneable { + + private Date generated; + private Collection<CounterDTO> counters; + + @ApiModelProperty("All counters in the NiFi.") + public Collection<CounterDTO> getCounters() { + return counters; + } + + public void setCounters(Collection<CounterDTO> counters) { + this.counters = counters; + } + + @XmlJavaTypeAdapter(TimeAdapter.class) + @ApiModelProperty("The timestamp when the report was generated.") + public Date getGenerated() { + return generated; + } + + public void setGenerated(Date generated) { + this.generated = generated; + } + + @Override + public CountersSnapshotDTO clone() { + final CountersSnapshotDTO other = new CountersSnapshotDTO(); + other.setGenerated(getGenerated()); + + final List<CounterDTO> clonedCounters = new ArrayList<>(getCounters().size()); + for (final CounterDTO counterDto : getCounters()) { + clonedCounters.add(counterDto.clone()); + } + + other.setCounters(clonedCounters); + return other; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/c335d810/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/NodeCountersSnapshotDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/NodeCountersSnapshotDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/NodeCountersSnapshotDTO.java new file mode 100644 index 0000000..b7f19c7 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/NodeCountersSnapshotDTO.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.dto; + +import javax.xml.bind.annotation.XmlType; + +import com.wordnik.swagger.annotations.ApiModelProperty; + +@XmlType(name = "nodeProcessorStatusSnapshot") +public class NodeCountersSnapshotDTO implements Cloneable { + private String nodeId; + private String address; + private Integer apiPort; + + private CountersSnapshotDTO snapshot; + + @ApiModelProperty("The unique ID that identifies the node") + public String getNodeId() { + return nodeId; + } + + public void setNodeId(String nodeId) { + this.nodeId = nodeId; + } + + @ApiModelProperty("The API address of the node") + public String getAddress() { + return address; + } + + public void setAddress(String address) { + this.address = address; + } + + @ApiModelProperty("The API port used to communicate with the node") + public Integer getApiPort() { + return apiPort; + } + + public void setApiPort(Integer apiPort) { + this.apiPort = apiPort; + } + + @ApiModelProperty("The counters from the node.") + public CountersSnapshotDTO getSnapshot() { + return snapshot; + } + + public void setSnapshot(CountersSnapshotDTO snapshot) { + this.snapshot = snapshot; + } + + @Override + public NodeCountersSnapshotDTO clone() { + final NodeCountersSnapshotDTO other = new NodeCountersSnapshotDTO(); + other.setNodeId(getNodeId()); + other.setAddress(getAddress()); + other.setApiPort(getApiPort()); + other.setSnapshot(getSnapshot().clone()); + return other; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/c335d810/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusMerger.java index 9568b6f..0237e62 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusMerger.java @@ -30,6 +30,10 @@ import org.apache.nifi.controller.status.RunStatus; import org.apache.nifi.controller.status.TransmissionStatus; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.web.api.dto.BulletinDTO; +import org.apache.nifi.web.api.dto.CounterDTO; +import org.apache.nifi.web.api.dto.CountersDTO; +import org.apache.nifi.web.api.dto.CountersSnapshotDTO; +import org.apache.nifi.web.api.dto.NodeCountersSnapshotDTO; import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsSnapshotDTO; import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO; import org.apache.nifi.web.api.dto.SystemDiagnosticsSnapshotDTO; @@ -522,6 +526,49 @@ public class StatusMerger { target.setCollectionTime(FormatUtils.formatHoursMinutesSeconds(target.getCollectionMillis(), TimeUnit.MILLISECONDS)); } + public static void merge(final CountersDTO target, final CountersDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) { + merge(target.getAggregateSnapshot(), toMerge.getAggregateSnapshot()); + + List<NodeCountersSnapshotDTO> nodeSnapshots = target.getNodeSnapshots(); + if (nodeSnapshots == null) { + nodeSnapshots = new ArrayList<>(); + } + + final NodeCountersSnapshotDTO nodeCountersSnapshot = new NodeCountersSnapshotDTO(); + nodeCountersSnapshot.setNodeId(nodeId); + nodeCountersSnapshot.setAddress(nodeAddress); + nodeCountersSnapshot.setApiPort(nodeApiPort); + nodeCountersSnapshot.setSnapshot(toMerge.getAggregateSnapshot()); + + nodeSnapshots.add(nodeCountersSnapshot); + + target.setNodeSnapshots(nodeSnapshots); + } + + public static void merge(final CountersSnapshotDTO target, final CountersSnapshotDTO toMerge) { + final Map<String, CounterDTO> counters = new HashMap<>(); + + for (final CounterDTO counter : target.getCounters()) { + counters.put(counter.getId(), counter); + } + + for (final CounterDTO counter : toMerge.getCounters()) { + final CounterDTO existing = counters.get(counter.getId()); + if (existing == null) { + counters.put(counter.getId(), counter); + } else { + merge(existing, counter); + } + } + + target.setCounters(counters.values()); + } + + public static void merge(final CounterDTO target, final CounterDTO toMerge) { + target.setValueCount(target.getValueCount() + toMerge.getValueCount()); + target.setValue(FormatUtils.formatCount(target.getValueCount())); + } + public static int getUtilization(final double used, final double total) { return (int) Math.round((used / total) * 100); http://git-wip-us.apache.org/repos/asf/nifi/blob/c335d810/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index c78b1c9..9f5b3fc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -200,6 +200,7 @@ import org.apache.nifi.web.api.dto.BulletinDTO; import org.apache.nifi.web.api.dto.ComponentStateDTO; import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; +import org.apache.nifi.web.api.dto.CountersDTO; import org.apache.nifi.web.api.dto.DropRequestDTO; import org.apache.nifi.web.api.dto.FlowFileSummaryDTO; import org.apache.nifi.web.api.dto.FlowSnippetDTO; @@ -234,6 +235,7 @@ import org.apache.nifi.web.api.entity.ControllerServiceEntity; import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity; import org.apache.nifi.web.api.entity.ControllerServicesEntity; import org.apache.nifi.web.api.entity.ControllerStatusEntity; +import org.apache.nifi.web.api.entity.CountersEntity; import org.apache.nifi.web.api.entity.DropRequestEntity; import org.apache.nifi.web.api.entity.FlowSnippetEntity; import org.apache.nifi.web.api.entity.ListingRequestEntity; @@ -337,7 +339,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C public static final Pattern PROVENANCE_QUERY_URI = Pattern.compile("/nifi-api/controller/provenance/[a-f0-9\\-]{36}"); public static final Pattern PROVENANCE_EVENT_URI = Pattern.compile("/nifi-api/controller/provenance/events/[0-9]+"); - public static final Pattern COUNTERS_URI = Pattern.compile("/nifi-api/controller/counters/[a-f0-9\\-]{36}"); public static final String CONTROLLER_SERVICES_URI = "/nifi-api/controller/controller-services/node"; public static final Pattern CONTROLLER_SERVICE_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}"); public static final Pattern CONTROLLER_SERVICE_STATE_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}/state"); @@ -347,6 +348,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C public static final Pattern REPORTING_TASK_STATE_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}/state"); public static final Pattern BULLETIN_BOARD_URI_PATTERN = Pattern.compile("/nifi-api/controller/bulletin-board"); public static final Pattern SYSTEM_DIAGNOSTICS_URI_PATTERN = Pattern.compile("/nifi-api/system-diagnostics"); + public static final Pattern COUNTERS_URI_PATTERN = Pattern.compile("/nifi-api/controller/counters"); + public static final Pattern COUNTER_URI_PATTERN = Pattern.compile("/nifi-api/controller/counters/[a-f0-9\\-]{36}"); public static final Pattern PROCESSOR_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}/status/history"); @@ -2469,6 +2472,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return "GET".equalsIgnoreCase(method) && SYSTEM_DIAGNOSTICS_URI_PATTERN.matcher(uri.getPath()).matches(); } + private static boolean isCountersEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && COUNTERS_URI_PATTERN.matcher(uri.getPath()).matches(); + } + private static boolean isRemoteProcessGroupEndpoint(final URI uri, final String method) { if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && REMOTE_PROCESS_GROUP_URI_PATTERN.matcher(uri.getPath()).matches()) { @@ -2503,8 +2510,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return false; } - private static boolean isCountersEndpoint(final URI uri) { - return COUNTERS_URI.matcher(uri.getPath()).matches(); + private static boolean isCounterEndpoint(final URI uri) { + return COUNTER_URI_PATTERN.matcher(uri.getPath()).matches(); } private static boolean isControllerServicesEndpoint(final URI uri, final String method) { @@ -2576,7 +2583,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C || isGroupStatusEndpoint(uri, method) || isProcessorStatusEndpoint(uri, method) || isControllerStatusEndpoint(uri, method) || isProcessorStatusHistoryEndpoint(uri, method) || isProcessGroupStatusHistoryEndpoint(uri, method) || isRemoteProcessGroupStatusHistoryEndpoint(uri, method) || isConnectionStatusHistoryEndpoint(uri, method) - || isBulletinBoardEndpoint(uri, method) || isSystemDiagnosticsEndpoint(uri, method); + || isBulletinBoardEndpoint(uri, method) || isSystemDiagnosticsEndpoint(uri, method) + || isCountersEndpoint(uri, method); } private void mergeProcessorValidationErrors(final ProcessorDTO processor, Map<NodeIdentifier, ProcessorDTO> processorMap) { @@ -2638,6 +2646,15 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } } + private void mergeCounters(final CountersDTO target, final Map<NodeIdentifier, CountersDTO> resultMap) { + for (final Map.Entry<NodeIdentifier, CountersDTO> entry : resultMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final CountersDTO toMerge = entry.getValue(); + + StatusMerger.merge(target, toMerge, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); + } + } + private void mergeGroupStatus(final ProcessGroupStatusDTO statusDto, final Map<NodeIdentifier, ProcessGroupStatusDTO> resultMap) { ProcessGroupStatusDTO mergedProcessGroupStatus = statusDto; for (final Map.Entry<NodeIdentifier, ProcessGroupStatusDTO> entry : resultMap.entrySet()) { @@ -3870,6 +3887,24 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C mergeSystemDiagnostics(responseDto, resultsMap); clientResponse = new NodeResponse(clientResponse, responseEntity); + } else if (hasSuccessfulClientResponse && isCountersEndpoint(uri, method)) { + final CountersEntity responseEntity = clientResponse.getClientResponse().getEntity(CountersEntity.class); + final CountersDTO responseDto = responseEntity.getCounters(); + + final Map<NodeIdentifier, CountersDTO> resultsMap = new HashMap<>(); + for (final NodeResponse nodeResponse : updatedNodesMap.values()) { + if (problematicNodeResponses.contains(nodeResponse)) { + continue; + } + + final CountersEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(CountersEntity.class); + final CountersDTO nodeStatus = nodeResponseEntity.getCounters(); + + resultsMap.put(nodeResponse.getNodeId(), nodeStatus); + } + mergeCounters(responseDto, resultsMap); + + clientResponse = new NodeResponse(clientResponse, responseEntity); } else { if (!nodeResponsesToDrain.isEmpty()) { drainResponses(nodeResponsesToDrain); @@ -3980,7 +4015,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C * @return Whether all problematic node responses were due to a missing counter */ private boolean isMissingCounter(final Set<NodeResponse> problematicNodeResponses, final URI uri) { - if (isCountersEndpoint(uri)) { + if (isCounterEndpoint(uri)) { boolean notFound = true; for (final NodeResponse problematicResponse : problematicNodeResponses) { if (problematicResponse.getStatus() != 404) { http://git-wip-us.apache.org/repos/asf/nifi/blob/c335d810/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 385ffd9..563e7aa 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -49,7 +49,6 @@ import org.apache.nifi.authorization.Authority; import org.apache.nifi.cluster.HeartbeatPayload; import org.apache.nifi.cluster.context.ClusterContext; import org.apache.nifi.cluster.context.ClusterContextThreadLocal; -import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException; import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.manager.impl.WebClusterManager; import org.apache.nifi.cluster.node.Node; @@ -104,6 +103,7 @@ import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; import org.apache.nifi.web.api.dto.CounterDTO; import org.apache.nifi.web.api.dto.CountersDTO; +import org.apache.nifi.web.api.dto.CountersSnapshotDTO; import org.apache.nifi.web.api.dto.DocumentedTypeDTO; import org.apache.nifi.web.api.dto.DropRequestDTO; import org.apache.nifi.web.api.dto.DtoFactory; @@ -2140,53 +2140,17 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public CountersDTO getCounters() { - if (properties.isClusterManager()) { - final Map<String, CounterDTO> mergedCountersMap = new HashMap<>(); - final Set<Node> connectedNodes = clusterManager.getNodes(Node.Status.CONNECTED); - - if (connectedNodes.isEmpty()) { - throw new NoConnectedNodesException(); - } - - for (final Node node : connectedNodes) { - final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload(); - if (nodeHeartbeatPayload == null) { - continue; - } - final List<Counter> nodeCounters = node.getHeartbeatPayload().getCounters(); - if (nodeCounters == null) { - continue; - } - - // for each node, add its counter values to the aggregate values - for (final Counter nodeCounter : nodeCounters) { - final CounterDTO mergedCounter = mergedCountersMap.get(nodeCounter.getIdentifier()); - - // either create a new aggregate counter or update the aggregate counter - if (mergedCounter == null) { - // add new counter - mergedCountersMap.put(nodeCounter.getIdentifier(), dtoFactory.createCounterDto(nodeCounter)); - } else { - // update aggregate counter - mergedCounter.setValueCount(mergedCounter.getValueCount() + nodeCounter.getValue()); - mergedCounter.setValue(FormatUtils.formatCount(mergedCounter.getValueCount())); - } - } - } - - final CountersDTO mergedCounters = new CountersDTO(); - mergedCounters.setGenerated(new Date()); - mergedCounters.setCounters(mergedCountersMap.values()); - return mergedCounters; - } else { - List<Counter> counters = controllerFacade.getCounters(); - Set<CounterDTO> counterDTOs = new LinkedHashSet<>(counters.size()); - for (Counter counter : counters) { - counterDTOs.add(dtoFactory.createCounterDto(counter)); - } - return dtoFactory.createCountersDto(counterDTOs); + List<Counter> counters = controllerFacade.getCounters(); + Set<CounterDTO> counterDTOs = new LinkedHashSet<>(counters.size()); + for (Counter counter : counters) { + counterDTOs.add(dtoFactory.createCounterDto(counter)); } + final CountersSnapshotDTO snapshotDto = dtoFactory.createCountersDto(counterDTOs); + final CountersDTO countersDto = new CountersDTO(); + countersDto.setAggregateSnapshot(snapshotDto); + + return countersDto; } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/c335d810/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java index 36b25f2..91ac843 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java @@ -578,6 +578,11 @@ public class ControllerResource extends ApplicationResource { ) @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId) { + // replicate if cluster manager + if (properties.isClusterManager()) { + return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); + } + final CountersDTO countersReport = serviceFacade.getCounters(); // create the revision http://git-wip-us.apache.org/repos/asf/nifi/blob/c335d810/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index c9355a2..5c6ea7f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -351,8 +351,8 @@ public final class DtoFactory { * @param counterDtos dtos * @return dto */ - public CountersDTO createCountersDto(final Collection<CounterDTO> counterDtos) { - final CountersDTO dto = new CountersDTO(); + public CountersSnapshotDTO createCountersDto(final Collection<CounterDTO> counterDtos) { + final CountersSnapshotDTO dto = new CountersSnapshotDTO(); dto.setCounters(counterDtos); dto.setGenerated(new Date()); return dto;
