Repository: nifi
Updated Branches:
  refs/heads/NIFI-1563 e49636af5 -> c335d810f


NIFI-1563: Replicate request for counters instead of pulling them from 
heartbeats


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c335d810
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c335d810
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c335d810

Branch: refs/heads/NIFI-1563
Commit: c335d810fb3dd05c80692caafede0a29621af357
Parents: e49636a
Author: Mark Payne <[email protected]>
Authored: Thu Mar 17 14:36:31 2016 -0400
Committer: Mark Payne <[email protected]>
Committed: Thu Mar 17 14:36:31 2016 -0400

----------------------------------------------------------------------
 .../org/apache/nifi/web/api/dto/CounterDTO.java | 10 ++-
 .../apache/nifi/web/api/dto/CountersDTO.java    | 51 +++++--------
 .../nifi/web/api/dto/CountersSnapshotDTO.java   | 73 ++++++++++++++++++
 .../web/api/dto/NodeCountersSnapshotDTO.java    | 78 ++++++++++++++++++++
 .../nifi/web/api/dto/status/StatusMerger.java   | 47 ++++++++++++
 .../cluster/manager/impl/WebClusterManager.java | 45 +++++++++--
 .../nifi/web/StandardNiFiServiceFacade.java     | 56 +++-----------
 .../apache/nifi/web/api/ControllerResource.java |  5 ++
 .../org/apache/nifi/web/api/dto/DtoFactory.java |  4 +-
 9 files changed, 281 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c335d810/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/CounterDTO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/CounterDTO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/CounterDTO.java
index 615ad93..79a64ce 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/CounterDTO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/CounterDTO.java
@@ -23,7 +23,7 @@ import javax.xml.bind.annotation.XmlType;
  * Counter value for a specific component in a specific context. A counter is 
a value that a component can adjust during processing.
  */
 @XmlType(name = "counter")
-public class CounterDTO {
+public class CounterDTO implements Cloneable {
 
     private String id;
     private String context;
@@ -98,4 +98,12 @@ public class CounterDTO {
         this.valueCount = valueCount;
     }
 
+    @Override
+    public CounterDTO clone() {
+        try {
+            return (CounterDTO) super.clone();
+        } catch (final CloneNotSupportedException cnse) {
+            throw new AssertionError(cnse);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c335d810/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/CountersDTO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/CountersDTO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/CountersDTO.java
index 0f162c9..bd9c115 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/CountersDTO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/CountersDTO.java
@@ -14,51 +14,34 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.nifi.web.api.dto;
 
-import com.wordnik.swagger.annotations.ApiModelProperty;
-import java.util.Collection;
-import java.util.Date;
+import java.util.List;
 
-import javax.xml.bind.annotation.XmlType;
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
-import org.apache.nifi.web.api.dto.util.TimeAdapter;
+import com.wordnik.swagger.annotations.ApiModelProperty;
 
-/**
- * All the counters in this NiFi instance at a given time.
- */
-@XmlType(name = "counters")
 public class CountersDTO {
+    private CountersSnapshotDTO aggregateSnapshot;
+    private List<NodeCountersSnapshotDTO> nodeSnapshots;
 
-    private Date generated;
-    private Collection<CounterDTO> counters;
-
-    /**
-     * @return the collection of counters
-     */
-    @ApiModelProperty(
-            value = "All counters in the NiFi."
-    )
-    public Collection<CounterDTO> getCounters() {
-        return counters;
+    @ApiModelProperty("A Counters snapshot that represents the aggregate 
values of all nodes in the cluster. If the NiFi instance is "
+        + "a standalone instance, rather than a cluster, this represents the 
stats of the single instance.")
+    public CountersSnapshotDTO getAggregateSnapshot() {
+        return aggregateSnapshot;
     }
 
-    public void setCounters(Collection<CounterDTO> counters) {
-        this.counters = counters;
+    public void setAggregateSnapshot(CountersSnapshotDTO aggregateSnapshot) {
+        this.aggregateSnapshot = aggregateSnapshot;
     }
 
-    /**
-     * @return the date/time that this report was generated
-     */
-    @XmlJavaTypeAdapter(TimeAdapter.class)
-    @ApiModelProperty(
-            value = "The timestamp when the report was generated."
-    )
-    public Date getGenerated() {
-        return generated;
+    @ApiModelProperty("A Counters snapshot for each node in the cluster. If 
the NiFi instance is a standalone instance, rather than "
+        + "a cluster, this may be null.")
+    public List<NodeCountersSnapshotDTO> getNodeSnapshots() {
+        return nodeSnapshots;
     }
 
-    public void setGenerated(Date generated) {
-        this.generated = generated;
+    public void setNodeSnapshots(List<NodeCountersSnapshotDTO> nodeSnapshots) {
+        this.nodeSnapshots = nodeSnapshots;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c335d810/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/CountersSnapshotDTO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/CountersSnapshotDTO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/CountersSnapshotDTO.java
new file mode 100644
index 0000000..1e1b389
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/CountersSnapshotDTO.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.dto;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+
+import javax.xml.bind.annotation.XmlType;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
+import org.apache.nifi.web.api.dto.util.TimeAdapter;
+
+import com.wordnik.swagger.annotations.ApiModelProperty;
+
+
+/**
+ * All the counters in this NiFi instance at a given time.
+ */
+@XmlType(name = "countersSnapshot")
+public class CountersSnapshotDTO implements Cloneable {
+
+    private Date generated;
+    private Collection<CounterDTO> counters;
+
+    @ApiModelProperty("All counters in the NiFi.")
+    public Collection<CounterDTO> getCounters() {
+        return counters;
+    }
+
+    public void setCounters(Collection<CounterDTO> counters) {
+        this.counters = counters;
+    }
+
+    @XmlJavaTypeAdapter(TimeAdapter.class)
+    @ApiModelProperty("The timestamp when the report was generated.")
+    public Date getGenerated() {
+        return generated;
+    }
+
+    public void setGenerated(Date generated) {
+        this.generated = generated;
+    }
+
+    @Override
+    public CountersSnapshotDTO clone() {
+        final CountersSnapshotDTO other = new CountersSnapshotDTO();
+        other.setGenerated(getGenerated());
+
+        final List<CounterDTO> clonedCounters = new 
ArrayList<>(getCounters().size());
+        for (final CounterDTO counterDto : getCounters()) {
+            clonedCounters.add(counterDto.clone());
+        }
+
+        other.setCounters(clonedCounters);
+        return other;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c335d810/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/NodeCountersSnapshotDTO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/NodeCountersSnapshotDTO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/NodeCountersSnapshotDTO.java
new file mode 100644
index 0000000..b7f19c7
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/NodeCountersSnapshotDTO.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.api.dto;
+
+import javax.xml.bind.annotation.XmlType;
+
+import com.wordnik.swagger.annotations.ApiModelProperty;
+
+@XmlType(name = "nodeProcessorStatusSnapshot")
+public class NodeCountersSnapshotDTO implements Cloneable {
+    private String nodeId;
+    private String address;
+    private Integer apiPort;
+
+    private CountersSnapshotDTO snapshot;
+
+    @ApiModelProperty("The unique ID that identifies the node")
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public void setNodeId(String nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    @ApiModelProperty("The API address of the node")
+    public String getAddress() {
+        return address;
+    }
+
+    public void setAddress(String address) {
+        this.address = address;
+    }
+
+    @ApiModelProperty("The API port used to communicate with the node")
+    public Integer getApiPort() {
+        return apiPort;
+    }
+
+    public void setApiPort(Integer apiPort) {
+        this.apiPort = apiPort;
+    }
+
+    @ApiModelProperty("The counters from the node.")
+    public CountersSnapshotDTO getSnapshot() {
+        return snapshot;
+    }
+
+    public void setSnapshot(CountersSnapshotDTO snapshot) {
+        this.snapshot = snapshot;
+    }
+
+    @Override
+    public NodeCountersSnapshotDTO clone() {
+        final NodeCountersSnapshotDTO other = new NodeCountersSnapshotDTO();
+        other.setNodeId(getNodeId());
+        other.setAddress(getAddress());
+        other.setApiPort(getApiPort());
+        other.setSnapshot(getSnapshot().clone());
+        return other;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c335d810/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusMerger.java
index 9568b6f..0237e62 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusMerger.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusMerger.java
@@ -30,6 +30,10 @@ import org.apache.nifi.controller.status.RunStatus;
 import org.apache.nifi.controller.status.TransmissionStatus;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.web.api.dto.BulletinDTO;
+import org.apache.nifi.web.api.dto.CounterDTO;
+import org.apache.nifi.web.api.dto.CountersDTO;
+import org.apache.nifi.web.api.dto.CountersSnapshotDTO;
+import org.apache.nifi.web.api.dto.NodeCountersSnapshotDTO;
 import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsSnapshotDTO;
 import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO;
 import org.apache.nifi.web.api.dto.SystemDiagnosticsSnapshotDTO;
@@ -522,6 +526,49 @@ public class StatusMerger {
         
target.setCollectionTime(FormatUtils.formatHoursMinutesSeconds(target.getCollectionMillis(),
 TimeUnit.MILLISECONDS));
     }
 
+    public static void merge(final CountersDTO target, final CountersDTO 
toMerge, final String nodeId, final String nodeAddress, final Integer 
nodeApiPort) {
+        merge(target.getAggregateSnapshot(), toMerge.getAggregateSnapshot());
+
+        List<NodeCountersSnapshotDTO> nodeSnapshots = 
target.getNodeSnapshots();
+        if (nodeSnapshots == null) {
+            nodeSnapshots = new ArrayList<>();
+        }
+
+        final NodeCountersSnapshotDTO nodeCountersSnapshot = new 
NodeCountersSnapshotDTO();
+        nodeCountersSnapshot.setNodeId(nodeId);
+        nodeCountersSnapshot.setAddress(nodeAddress);
+        nodeCountersSnapshot.setApiPort(nodeApiPort);
+        nodeCountersSnapshot.setSnapshot(toMerge.getAggregateSnapshot());
+
+        nodeSnapshots.add(nodeCountersSnapshot);
+
+        target.setNodeSnapshots(nodeSnapshots);
+    }
+
+    public static void merge(final CountersSnapshotDTO target, final 
CountersSnapshotDTO toMerge) {
+        final Map<String, CounterDTO> counters = new HashMap<>();
+
+        for (final CounterDTO counter : target.getCounters()) {
+            counters.put(counter.getId(), counter);
+        }
+
+        for (final CounterDTO counter : toMerge.getCounters()) {
+            final CounterDTO existing = counters.get(counter.getId());
+            if (existing == null) {
+                counters.put(counter.getId(), counter);
+            } else {
+                merge(existing, counter);
+            }
+        }
+
+        target.setCounters(counters.values());
+    }
+
+    public static void merge(final CounterDTO target, final CounterDTO 
toMerge) {
+        target.setValueCount(target.getValueCount() + toMerge.getValueCount());
+        target.setValue(FormatUtils.formatCount(target.getValueCount()));
+    }
+
 
     public static int getUtilization(final double used, final double total) {
         return (int) Math.round((used / total) * 100);

http://git-wip-us.apache.org/repos/asf/nifi/blob/c335d810/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index c78b1c9..9f5b3fc 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -200,6 +200,7 @@ import org.apache.nifi.web.api.dto.BulletinDTO;
 import org.apache.nifi.web.api.dto.ComponentStateDTO;
 import org.apache.nifi.web.api.dto.ControllerServiceDTO;
 import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
+import org.apache.nifi.web.api.dto.CountersDTO;
 import org.apache.nifi.web.api.dto.DropRequestDTO;
 import org.apache.nifi.web.api.dto.FlowFileSummaryDTO;
 import org.apache.nifi.web.api.dto.FlowSnippetDTO;
@@ -234,6 +235,7 @@ import 
org.apache.nifi.web.api.entity.ControllerServiceEntity;
 import 
org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
 import org.apache.nifi.web.api.entity.ControllerServicesEntity;
 import org.apache.nifi.web.api.entity.ControllerStatusEntity;
+import org.apache.nifi.web.api.entity.CountersEntity;
 import org.apache.nifi.web.api.entity.DropRequestEntity;
 import org.apache.nifi.web.api.entity.FlowSnippetEntity;
 import org.apache.nifi.web.api.entity.ListingRequestEntity;
@@ -337,7 +339,6 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
     public static final Pattern PROVENANCE_QUERY_URI = 
Pattern.compile("/nifi-api/controller/provenance/[a-f0-9\\-]{36}");
     public static final Pattern PROVENANCE_EVENT_URI = 
Pattern.compile("/nifi-api/controller/provenance/events/[0-9]+");
 
-    public static final Pattern COUNTERS_URI = 
Pattern.compile("/nifi-api/controller/counters/[a-f0-9\\-]{36}");
     public static final String CONTROLLER_SERVICES_URI = 
"/nifi-api/controller/controller-services/node";
     public static final Pattern CONTROLLER_SERVICE_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}");
     public static final Pattern CONTROLLER_SERVICE_STATE_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}/state");
@@ -347,6 +348,8 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
     public static final Pattern REPORTING_TASK_STATE_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}/state");
     public static final Pattern BULLETIN_BOARD_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/bulletin-board");
     public static final Pattern SYSTEM_DIAGNOSTICS_URI_PATTERN = 
Pattern.compile("/nifi-api/system-diagnostics");
+    public static final Pattern COUNTERS_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/counters");
+    public static final Pattern COUNTER_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/counters/[a-f0-9\\-]{36}");
 
     public static final Pattern PROCESSOR_STATUS_HISTORY_URI_PATTERN =
         
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}/status/history");
@@ -2469,6 +2472,10 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
         return "GET".equalsIgnoreCase(method) && 
SYSTEM_DIAGNOSTICS_URI_PATTERN.matcher(uri.getPath()).matches();
     }
 
+    private static boolean isCountersEndpoint(final URI uri, final String 
method) {
+        return "GET".equalsIgnoreCase(method) && 
COUNTERS_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
 
     private static boolean isRemoteProcessGroupEndpoint(final URI uri, final 
String method) {
         if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) 
&& REMOTE_PROCESS_GROUP_URI_PATTERN.matcher(uri.getPath()).matches()) {
@@ -2503,8 +2510,8 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
         return false;
     }
 
-    private static boolean isCountersEndpoint(final URI uri) {
-        return COUNTERS_URI.matcher(uri.getPath()).matches();
+    private static boolean isCounterEndpoint(final URI uri) {
+        return COUNTER_URI_PATTERN.matcher(uri.getPath()).matches();
     }
 
     private static boolean isControllerServicesEndpoint(final URI uri, final 
String method) {
@@ -2576,7 +2583,8 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
                 || isGroupStatusEndpoint(uri, method) || 
isProcessorStatusEndpoint(uri, method) || isControllerStatusEndpoint(uri, 
method)
                 || isProcessorStatusHistoryEndpoint(uri, method) || 
isProcessGroupStatusHistoryEndpoint(uri, method)
                 || isRemoteProcessGroupStatusHistoryEndpoint(uri, method) || 
isConnectionStatusHistoryEndpoint(uri, method)
-                || isBulletinBoardEndpoint(uri, method) || 
isSystemDiagnosticsEndpoint(uri, method);
+                || isBulletinBoardEndpoint(uri, method) || 
isSystemDiagnosticsEndpoint(uri, method)
+                || isCountersEndpoint(uri, method);
     }
 
     private void mergeProcessorValidationErrors(final ProcessorDTO processor, 
Map<NodeIdentifier, ProcessorDTO> processorMap) {
@@ -2638,6 +2646,15 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
         }
     }
 
+    private void mergeCounters(final CountersDTO target, final 
Map<NodeIdentifier, CountersDTO> resultMap) {
+        for (final Map.Entry<NodeIdentifier, CountersDTO> entry : 
resultMap.entrySet()) {
+            final NodeIdentifier nodeId = entry.getKey();
+            final CountersDTO toMerge = entry.getValue();
+
+            StatusMerger.merge(target, toMerge, nodeId.getId(), 
nodeId.getApiAddress(), nodeId.getApiPort());
+        }
+    }
+
     private void mergeGroupStatus(final ProcessGroupStatusDTO statusDto, final 
Map<NodeIdentifier, ProcessGroupStatusDTO> resultMap) {
         ProcessGroupStatusDTO mergedProcessGroupStatus = statusDto;
         for (final Map.Entry<NodeIdentifier, ProcessGroupStatusDTO> entry : 
resultMap.entrySet()) {
@@ -3870,6 +3887,24 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
             mergeSystemDiagnostics(responseDto, resultsMap);
 
             clientResponse = new NodeResponse(clientResponse, responseEntity);
+        } else if (hasSuccessfulClientResponse && isCountersEndpoint(uri, 
method)) {
+            final CountersEntity responseEntity = 
clientResponse.getClientResponse().getEntity(CountersEntity.class);
+            final CountersDTO responseDto = responseEntity.getCounters();
+
+            final Map<NodeIdentifier, CountersDTO> resultsMap = new 
HashMap<>();
+            for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+                if (problematicNodeResponses.contains(nodeResponse)) {
+                    continue;
+                }
+
+                final CountersEntity nodeResponseEntity = nodeResponse == 
clientResponse ? responseEntity : 
nodeResponse.getClientResponse().getEntity(CountersEntity.class);
+                final CountersDTO nodeStatus = 
nodeResponseEntity.getCounters();
+
+                resultsMap.put(nodeResponse.getNodeId(), nodeStatus);
+            }
+            mergeCounters(responseDto, resultsMap);
+
+            clientResponse = new NodeResponse(clientResponse, responseEntity);
         } else {
             if (!nodeResponsesToDrain.isEmpty()) {
                 drainResponses(nodeResponsesToDrain);
@@ -3980,7 +4015,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
      * @return Whether all problematic node responses were due to a missing 
counter
      */
     private boolean isMissingCounter(final Set<NodeResponse> 
problematicNodeResponses, final URI uri) {
-        if (isCountersEndpoint(uri)) {
+        if (isCounterEndpoint(uri)) {
             boolean notFound = true;
             for (final NodeResponse problematicResponse : 
problematicNodeResponses) {
                 if (problematicResponse.getStatus() != 404) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/c335d810/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 385ffd9..563e7aa 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -49,7 +49,6 @@ import org.apache.nifi.authorization.Authority;
 import org.apache.nifi.cluster.HeartbeatPayload;
 import org.apache.nifi.cluster.context.ClusterContext;
 import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
-import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
 import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
 import org.apache.nifi.cluster.manager.impl.WebClusterManager;
 import org.apache.nifi.cluster.node.Node;
@@ -104,6 +103,7 @@ import org.apache.nifi.web.api.dto.ControllerServiceDTO;
 import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
 import org.apache.nifi.web.api.dto.CounterDTO;
 import org.apache.nifi.web.api.dto.CountersDTO;
+import org.apache.nifi.web.api.dto.CountersSnapshotDTO;
 import org.apache.nifi.web.api.dto.DocumentedTypeDTO;
 import org.apache.nifi.web.api.dto.DropRequestDTO;
 import org.apache.nifi.web.api.dto.DtoFactory;
@@ -2140,53 +2140,17 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
 
     @Override
     public CountersDTO getCounters() {
-        if (properties.isClusterManager()) {
-            final Map<String, CounterDTO> mergedCountersMap = new HashMap<>();
-            final Set<Node> connectedNodes = 
clusterManager.getNodes(Node.Status.CONNECTED);
-
-            if (connectedNodes.isEmpty()) {
-                throw new NoConnectedNodesException();
-            }
-
-            for (final Node node : connectedNodes) {
-                final HeartbeatPayload nodeHeartbeatPayload = 
node.getHeartbeatPayload();
-                if (nodeHeartbeatPayload == null) {
-                    continue;
-                }
-                final List<Counter> nodeCounters = 
node.getHeartbeatPayload().getCounters();
-                if (nodeCounters == null) {
-                    continue;
-                }
-
-                // for each node, add its counter values to the aggregate 
values
-                for (final Counter nodeCounter : nodeCounters) {
-                    final CounterDTO mergedCounter = 
mergedCountersMap.get(nodeCounter.getIdentifier());
-
-                    // either create a new aggregate counter or update the 
aggregate counter
-                    if (mergedCounter == null) {
-                        // add new counter
-                        mergedCountersMap.put(nodeCounter.getIdentifier(), 
dtoFactory.createCounterDto(nodeCounter));
-                    } else {
-                        // update aggregate counter
-                        
mergedCounter.setValueCount(mergedCounter.getValueCount() + 
nodeCounter.getValue());
-                        
mergedCounter.setValue(FormatUtils.formatCount(mergedCounter.getValueCount()));
-                    }
-                }
-            }
-
-            final CountersDTO mergedCounters = new CountersDTO();
-            mergedCounters.setGenerated(new Date());
-            mergedCounters.setCounters(mergedCountersMap.values());
-            return mergedCounters;
-        } else {
-            List<Counter> counters = controllerFacade.getCounters();
-            Set<CounterDTO> counterDTOs = new LinkedHashSet<>(counters.size());
-            for (Counter counter : counters) {
-                counterDTOs.add(dtoFactory.createCounterDto(counter));
-            }
-            return dtoFactory.createCountersDto(counterDTOs);
+        List<Counter> counters = controllerFacade.getCounters();
+        Set<CounterDTO> counterDTOs = new LinkedHashSet<>(counters.size());
+        for (Counter counter : counters) {
+            counterDTOs.add(dtoFactory.createCounterDto(counter));
         }
 
+        final CountersSnapshotDTO snapshotDto = 
dtoFactory.createCountersDto(counterDTOs);
+        final CountersDTO countersDto = new CountersDTO();
+        countersDto.setAggregateSnapshot(snapshotDto);
+
+        return countersDto;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/c335d810/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
index 36b25f2..91ac843 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
@@ -578,6 +578,11 @@ public class ControllerResource extends 
ApplicationResource {
             )
             @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId) {
 
+        // replicate if cluster manager
+        if (properties.isClusterManager()) {
+            return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+        }
+
         final CountersDTO countersReport = serviceFacade.getCounters();
 
         // create the revision

http://git-wip-us.apache.org/repos/asf/nifi/blob/c335d810/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index c9355a2..5c6ea7f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -351,8 +351,8 @@ public final class DtoFactory {
      * @param counterDtos dtos
      * @return dto
      */
-    public CountersDTO createCountersDto(final Collection<CounterDTO> 
counterDtos) {
-        final CountersDTO dto = new CountersDTO();
+    public CountersSnapshotDTO createCountersDto(final Collection<CounterDTO> 
counterDtos) {
+        final CountersSnapshotDTO dto = new CountersSnapshotDTO();
         dto.setCounters(counterDtos);
         dto.setGenerated(new Date());
         return dto;

Reply via email to