Repository: nifi
Updated Branches:
  refs/heads/master fdea876ed -> af2513adf


NIFI-1295:
- Adding UI controls for terminating hung threads.
- Showing current number of terminated threads.
- Fixing issue when replicating terminate threads request throughout the 
cluster.

This closes #2607.

Signed-off-by: Mark Payne <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/af2513ad
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/af2513ad
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/af2513ad

Branch: refs/heads/master
Commit: af2513adf843f26b4e80a1526e67095a69d85adf
Parents: fdea876
Author: Matt Gilman <[email protected]>
Authored: Thu Apr 5 10:56:54 2018 -0400
Committer: Mark Payne <[email protected]>
Committed: Wed Apr 25 16:23:34 2018 -0400

----------------------------------------------------------------------
 .../controller/status/ProcessGroupStatus.java   | 14 ++++
 .../nifi/controller/status/ProcessorStatus.java | 12 ++++
 .../web/api/dto/status/ControllerStatusDTO.java | 16 +++++
 .../status/ProcessGroupStatusSnapshotDTO.java   | 16 ++++-
 .../dto/status/ProcessorStatusSnapshotDTO.java  | 14 ++++
 .../nifi/cluster/manager/StatusMerger.java      | 25 ++++---
 .../apache/nifi/controller/FlowController.java  | 72 ++++++++++++++++++++
 .../repository/StandardProcessSession.java      | 61 +++++++++--------
 .../apache/nifi/web/api/ProcessorResource.java  | 48 +++++++------
 .../org/apache/nifi/web/api/dto/DtoFactory.java |  4 ++
 .../nifi/web/controller/ControllerFacade.java   | 20 +++---
 .../WEB-INF/partials/canvas/flow-status.jsp     | 57 ++++++++++++----
 .../nifi-web-ui/src/main/webapp/css/main.css    |  2 +-
 .../nf-ng-canvas-flow-status-controller.js      | 45 +++++++++---
 .../src/main/webapp/js/nf/canvas/nf-actions.js  | 19 ++++++
 .../main/webapp/js/nf/canvas/nf-canvas-utils.js | 51 ++++++++++++--
 .../main/webapp/js/nf/canvas/nf-context-menu.js | 25 +++++++
 .../webapp/js/nf/canvas/nf-process-group.js     |  6 +-
 .../main/webapp/js/nf/canvas/nf-processor.js    |  8 +--
 .../webapp/js/nf/summary/nf-summary-table.js    | 28 ++++++--
 20 files changed, 423 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/af2513ad/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java
 
b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java
index e07d1c1..a6acbc1 100644
--- 
a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java
+++ 
b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java
@@ -35,6 +35,7 @@ public class ProcessGroupStatus implements Cloneable {
     private Integer outputCount;
     private Long outputContentSize;
     private Integer activeThreadCount;
+    private Integer terminatedThreadCount;
     private Integer queuedCount;
     private Long queuedContentSize;
     private Long bytesRead;
@@ -149,6 +150,14 @@ public class ProcessGroupStatus implements Cloneable {
         this.activeThreadCount = activeThreadCount;
     }
 
+    public Integer getTerminatedThreadCount() {
+        return terminatedThreadCount;
+    }
+
+    public void setTerminatedThreadCount(Integer terminatedThreadCount) {
+        this.terminatedThreadCount = terminatedThreadCount;
+    }
+
     public Collection<ConnectionStatus> getConnectionStatus() {
         return connectionStatus;
     }
@@ -257,6 +266,7 @@ public class ProcessGroupStatus implements Cloneable {
         clonedObj.inputContentSize = inputContentSize;
         clonedObj.inputCount = inputCount;
         clonedObj.activeThreadCount = activeThreadCount;
+        clonedObj.terminatedThreadCount = terminatedThreadCount;
         clonedObj.queuedContentSize = queuedContentSize;
         clonedObj.queuedCount = queuedCount;
         clonedObj.bytesRead = bytesRead;
@@ -334,6 +344,8 @@ public class ProcessGroupStatus implements Cloneable {
         builder.append(outputContentSize);
         builder.append(", activeThreadCount=");
         builder.append(activeThreadCount);
+        builder.append(", terminatedThreadCount=");
+        builder.append(terminatedThreadCount);
         builder.append(", flowFilesTransferred=");
         builder.append(flowFilesTransferred);
         builder.append(", bytesTransferred=");
@@ -403,6 +415,7 @@ public class ProcessGroupStatus implements Cloneable {
         target.setBytesRead(target.getBytesRead() + toMerge.getBytesRead());
         target.setBytesWritten(target.getBytesWritten() + 
toMerge.getBytesWritten());
         target.setActiveThreadCount(target.getActiveThreadCount() + 
toMerge.getActiveThreadCount());
+        target.setTerminatedThreadCount(target.getTerminatedThreadCount() + 
toMerge.getTerminatedThreadCount());
         target.setFlowFilesTransferred(target.getFlowFilesTransferred() + 
toMerge.getFlowFilesTransferred());
         target.setBytesTransferred(target.getBytesTransferred() + 
toMerge.getBytesTransferred());
         target.setFlowFilesReceived(target.getFlowFilesReceived() + 
toMerge.getFlowFilesReceived());
@@ -452,6 +465,7 @@ public class ProcessGroupStatus implements Cloneable {
             }
 
             merged.setActiveThreadCount(merged.getActiveThreadCount() + 
statusToMerge.getActiveThreadCount());
+            merged.setTerminatedThreadCount(merged.getTerminatedThreadCount() 
+ statusToMerge.getTerminatedThreadCount());
             merged.setBytesRead(merged.getBytesRead() + 
statusToMerge.getBytesRead());
             merged.setBytesWritten(merged.getBytesWritten() + 
statusToMerge.getBytesWritten());
             merged.setInputBytes(merged.getInputBytes() + 
statusToMerge.getInputBytes());

http://git-wip-us.apache.org/repos/asf/nifi/blob/af2513ad/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java 
b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java
index 808f2f6..93a6d87 100644
--- 
a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java
+++ 
b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java
@@ -43,6 +43,7 @@ public class ProcessorStatus implements Cloneable {
     private int flowFilesRemoved;
     private long averageLineageDuration;
     private int activeThreadCount;
+    private int terminatedThreadCount;
     private int flowFilesReceived;
     private long bytesReceived;
     private int flowFilesSent;
@@ -193,6 +194,14 @@ public class ProcessorStatus implements Cloneable {
         this.activeThreadCount = activeThreadCount;
     }
 
+    public int getTerminatedThreadCount() {
+        return terminatedThreadCount;
+    }
+
+    public void setTerminatedThreadCount(int terminatedThreadCount) {
+        this.terminatedThreadCount = terminatedThreadCount;
+    }
+
     public int getFlowFilesReceived() {
         return flowFilesReceived;
     }
@@ -237,6 +246,7 @@ public class ProcessorStatus implements Cloneable {
     public ProcessorStatus clone() {
         final ProcessorStatus clonedObj = new ProcessorStatus();
         clonedObj.activeThreadCount = activeThreadCount;
+        clonedObj.terminatedThreadCount = terminatedThreadCount;
         clonedObj.bytesRead = bytesRead;
         clonedObj.bytesWritten = bytesWritten;
         clonedObj.flowFilesReceived = flowFilesReceived;
@@ -294,6 +304,8 @@ public class ProcessorStatus implements Cloneable {
         builder.append(processingNanos);
         builder.append(", activeThreadCount=");
         builder.append(activeThreadCount);
+        builder.append(", terminatedThreadCount=");
+        builder.append(terminatedThreadCount);
         builder.append(", counters=");
         builder.append(counters);
         builder.append("]");

http://git-wip-us.apache.org/repos/asf/nifi/blob/af2513ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ControllerStatusDTO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ControllerStatusDTO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ControllerStatusDTO.java
index cddf85e..12c6530 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ControllerStatusDTO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ControllerStatusDTO.java
@@ -27,6 +27,7 @@ import javax.xml.bind.annotation.XmlType;
 public class ControllerStatusDTO implements Cloneable {
 
     private Integer activeThreadCount = 0;
+    private Integer terminatedThreadCount = 0;
     private String queued;
     private Integer flowFilesQueued = 0;
     private Long bytesQueued = 0L;
@@ -59,6 +60,20 @@ public class ControllerStatusDTO implements Cloneable {
     }
 
     /**
+     * The terminated thread count.
+     *
+     * @return The terminated thread count
+     */
+    @ApiModelProperty("The number of terminated threads in the NiFi.")
+    public Integer getTerminatedThreadCount() {
+        return terminatedThreadCount;
+    }
+
+    public void setTerminatedThreadCount(Integer terminatedThreadCount) {
+        this.terminatedThreadCount = terminatedThreadCount;
+    }
+
+    /**
      * @return queue for the controller
      */
     @ApiModelProperty("The number of flowfiles queued in the NiFi.")
@@ -209,6 +224,7 @@ public class ControllerStatusDTO implements Cloneable {
     public ControllerStatusDTO clone() {
         final ControllerStatusDTO other = new ControllerStatusDTO();
         other.setActiveThreadCount(getActiveThreadCount());
+        other.setTerminatedThreadCount(getTerminatedThreadCount());
         other.setQueued(getQueued());
         other.setFlowFilesQueued(getFlowFilesQueued());
         other.setBytesQueued(getBytesQueued());

http://git-wip-us.apache.org/repos/asf/nifi/blob/af2513ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessGroupStatusSnapshotDTO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessGroupStatusSnapshotDTO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessGroupStatusSnapshotDTO.java
index c701ed6..f1b324e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessGroupStatusSnapshotDTO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessGroupStatusSnapshotDTO.java
@@ -77,6 +77,7 @@ public class ProcessGroupStatusSnapshotDTO implements 
Cloneable {
     private String sent;
 
     private Integer activeThreadCount = 0;
+    private Integer terminatedThreadCount = 0;
 
     /**
      * The id for the process group.
@@ -128,11 +129,23 @@ public class ProcessGroupStatusSnapshotDTO implements 
Cloneable {
     }
 
     /**
+     * @return number of threads currently terminated for this process group
+     */
+    @ApiModelProperty("The number of threads currently terminated for the 
process group.")
+    public Integer getTerminatedThreadCount() {
+        return terminatedThreadCount;
+    }
+
+    public void setTerminatedThreadCount(Integer terminatedThreadCount) {
+        this.terminatedThreadCount = terminatedThreadCount;
+    }
+
+    /**
      * The status of all connections in this process group.
      *
      * @return The status of all connections
      */
-    @ApiModelProperty("The status of all conenctions in the process group.")
+    @ApiModelProperty("The status of all connections in the process group.")
     public Collection<ConnectionStatusSnapshotEntity> 
getConnectionStatusSnapshots() {
         return connectionStatusSnapshots;
     }
@@ -523,6 +536,7 @@ public class ProcessGroupStatusSnapshotDTO implements 
Cloneable {
         other.setSent(getSent());
 
         other.setActiveThreadCount(getActiveThreadCount());
+        other.setTerminatedThreadCount(getTerminatedThreadCount());
 
         
other.setConnectionStatusSnapshots(copy(getConnectionStatusSnapshots()));
         other.setProcessorStatusSnapshots(copy(getProcessorStatusSnapshots()));

http://git-wip-us.apache.org/repos/asf/nifi/blob/af2513ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessorStatusSnapshotDTO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessorStatusSnapshotDTO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessorStatusSnapshotDTO.java
index 3d17e94..9c1fd59 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessorStatusSnapshotDTO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessorStatusSnapshotDTO.java
@@ -51,6 +51,7 @@ public class ProcessorStatusSnapshotDTO implements Cloneable {
     private String tasks;
     private String tasksDuration;
     private Integer activeThreadCount = 0;
+    private Integer terminatedThreadCount = 0;
 
     /* getters / setters */
     /**
@@ -189,6 +190,18 @@ public class ProcessorStatusSnapshotDTO implements 
Cloneable {
     }
 
     /**
+     * @return number of threads currently terminated for this Processor
+     */
+    @ApiModelProperty("The number of threads currently terminated for the 
processor.")
+    public Integer getTerminatedThreadCount() {
+        return terminatedThreadCount;
+    }
+
+    public void setTerminatedThreadCount(Integer terminatedThreadCount) {
+        this.terminatedThreadCount = terminatedThreadCount;
+    }
+
+    /**
      * @return number of task this connectable has had over the last 5 minutes
      */
     @ApiModelProperty("The total number of task this connectable has completed 
over the last 5 minutes.")
@@ -304,6 +317,7 @@ public class ProcessorStatusSnapshotDTO implements 
Cloneable {
         other.setTasksDuration(getTasksDuration());
         other.setTasksDurationNanos(getTasksDurationNanos());
         other.setActiveThreadCount(getActiveThreadCount());
+        other.setTerminatedThreadCount(getTerminatedThreadCount());
         other.setInput(getInput());
         other.setOutput(getOutput());
         other.setRead(getRead());

http://git-wip-us.apache.org/repos/asf/nifi/blob/af2513ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
index 3ce3973..dd10b5b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
@@ -17,17 +17,6 @@
 
 package org.apache.nifi.cluster.manager;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.nifi.controller.status.RunStatus;
 import org.apache.nifi.controller.status.TransmissionStatus;
 import org.apache.nifi.registry.flow.VersionedFlowState;
@@ -69,6 +58,17 @@ import 
org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity;
 import org.apache.nifi.web.api.entity.ProcessorStatusSnapshotEntity;
 import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusSnapshotEntity;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
 public class StatusMerger {
     private static final String ZERO_COUNT = "0";
     private static final String ZERO_BYTES = "0 bytes";
@@ -82,6 +82,7 @@ public class StatusMerger {
         }
 
         target.setActiveThreadCount(target.getActiveThreadCount() + 
toMerge.getActiveThreadCount());
+        target.setTerminatedThreadCount(target.getTerminatedThreadCount() + 
toMerge.getTerminatedThreadCount());
         target.setBytesQueued(target.getBytesQueued() + 
toMerge.getBytesQueued());
         target.setFlowFilesQueued(target.getFlowFilesQueued() + 
toMerge.getFlowFilesQueued());
 
@@ -158,6 +159,7 @@ public class StatusMerger {
         target.setFlowFilesSent(target.getFlowFilesSent() + 
toMerge.getFlowFilesSent());
 
         target.setActiveThreadCount(target.getActiveThreadCount() + 
toMerge.getActiveThreadCount());
+        target.setTerminatedThreadCount(target.getTerminatedThreadCount() + 
toMerge.getTerminatedThreadCount());
         updatePrettyPrintedFields(target);
 
         // connection status
@@ -431,6 +433,7 @@ public class StatusMerger {
         target.setTaskCount(target.getTaskCount() + toMerge.getTaskCount());
         target.setTasksDurationNanos(target.getTasksDurationNanos() + 
toMerge.getTasksDurationNanos());
         target.setActiveThreadCount(target.getActiveThreadCount() + 
toMerge.getActiveThreadCount());
+        target.setTerminatedThreadCount(target.getTerminatedThreadCount() + 
toMerge.getTerminatedThreadCount());
         updatePrettyPrintedFields(target);
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/af2513ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index c2b7c7a..ba4075e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -2780,6 +2780,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         status.setId(group.getIdentifier());
         status.setName(isAuthorized.evaluate(group) ? group.getName() : 
group.getIdentifier());
         int activeGroupThreads = 0;
+        int terminatedGroupThreads = 0;
         long bytesRead = 0L;
         long bytesWritten = 0L;
         int queuedCount = 0;
@@ -2802,6 +2803,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             final ProcessorStatus procStat = getProcessorStatus(statusReport, 
procNode, isAuthorized);
             processorStatusCollection.add(procStat);
             activeGroupThreads += procStat.getActiveThreadCount();
+            terminatedGroupThreads += procStat.getTerminatedThreadCount();
             bytesRead += procStat.getBytesRead();
             bytesWritten += procStat.getBytesWritten();
 
@@ -2818,6 +2820,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             final ProcessGroupStatus childGroupStatus = 
getGroupStatus(childGroup, statusReport, isAuthorized);
             localChildGroupStatusCollection.add(childGroupStatus);
             activeGroupThreads += childGroupStatus.getActiveThreadCount();
+            terminatedGroupThreads += 
childGroupStatus.getTerminatedThreadCount();
             bytesRead += childGroupStatus.getBytesRead();
             bytesWritten += childGroupStatus.getBytesWritten();
             queuedCount += childGroupStatus.getQueuedCount();
@@ -3042,6 +3045,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         }
 
         status.setActiveThreadCount(activeGroupThreads);
+        status.setTerminatedThreadCount(terminatedGroupThreads);
         status.setBytesRead(bytesRead);
         status.setBytesWritten(bytesWritten);
         status.setQueuedCount(queuedCount);
@@ -3216,6 +3220,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         }
 
         status.setExecutionNode(procNode.getExecutionNode());
+        status.setTerminatedThreadCount(procNode.getTerminatedThreadCount());
         
status.setActiveThreadCount(processScheduler.getActiveThreadCount(procNode));
 
         return status;
@@ -3848,6 +3853,73 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         return new QueueSize(count, contentSize);
     }
 
+    public class GroupStatusCounts {
+        private int queuedCount = 0;
+        private long queuedContentSize = 0;
+        private int activeThreadCount = 0;
+        private int terminatedThreadCount = 0;
+
+        public GroupStatusCounts(final ProcessGroup group) {
+            calculateCounts(group);
+        }
+
+        private void calculateCounts(final ProcessGroup group) {
+            for (final Connection connection : group.getConnections()) {
+                final QueueSize size = connection.getFlowFileQueue().size();
+                queuedCount += size.getObjectCount();
+                queuedContentSize += size.getByteCount();
+
+                final Connectable source = connection.getSource();
+                if 
(ConnectableType.REMOTE_OUTPUT_PORT.equals(source.getConnectableType())) {
+                    final RemoteGroupPort remoteOutputPort = (RemoteGroupPort) 
source;
+                    activeThreadCount += 
processScheduler.getActiveThreadCount(remoteOutputPort);
+                }
+
+                final Connectable destination = connection.getDestination();
+                if 
(ConnectableType.REMOTE_INPUT_PORT.equals(destination.getConnectableType())) {
+                    final RemoteGroupPort remoteInputPort = (RemoteGroupPort) 
destination;
+                    activeThreadCount += 
processScheduler.getActiveThreadCount(remoteInputPort);
+                }
+            }
+            for (final ProcessorNode processor : group.getProcessors()) {
+                activeThreadCount += 
processScheduler.getActiveThreadCount(processor);
+                terminatedThreadCount += processor.getTerminatedThreadCount();
+            }
+            for (final Port port : group.getInputPorts()) {
+                activeThreadCount += 
processScheduler.getActiveThreadCount(port);
+            }
+            for (final Port port : group.getOutputPorts()) {
+                activeThreadCount += 
processScheduler.getActiveThreadCount(port);
+            }
+            for (final Funnel funnel : group.getFunnels()) {
+                activeThreadCount += 
processScheduler.getActiveThreadCount(funnel);
+            }
+            for (final ProcessGroup childGroup : group.getProcessGroups()) {
+                calculateCounts(childGroup);
+            }
+        }
+
+        public int getQueuedCount() {
+            return queuedCount;
+        }
+
+        public long getQueuedContentSize() {
+            return queuedContentSize;
+        }
+
+        public int getActiveThreadCount() {
+            return activeThreadCount;
+        }
+
+        public int getTerminatedThreadCount() {
+            return terminatedThreadCount;
+        }
+    }
+
+    public GroupStatusCounts getGroupStatusCounts(final ProcessGroup group) {
+        return new GroupStatusCounts(group);
+    }
+
     public int getActiveThreadCount() {
         final int timerDrivenCount = 
timerDrivenEngineRef.get().getActiveCount();
         final int eventDrivenCount = 
eventDrivenSchedulingAgent.getActiveThreadCount();

http://git-wip-us.apache.org/repos/asf/nifi/blob/af2513ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index eed4dbe..bf9b174 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -16,34 +16,6 @@
  */
 package org.apache.nifi.controller.repository;
 
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayInputStream;
-import java.io.Closeable;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Objects;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.ProcessorNode;
@@ -86,6 +58,34 @@ import org.apache.nifi.stream.io.StreamUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
 /**
  * <p>
  * Provides a ProcessSession that ensures all accesses, changes and transfers
@@ -330,7 +330,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     }
 
     @Override
-    public void commit() {
+    public synchronized void commit() {
         verifyTaskActive();
         checkpoint();
         commit(this.checkpoint);
@@ -946,7 +946,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         verifyTaskActive();
     }
 
-    private void rollback(final boolean penalize, final boolean 
rollbackCheckpoint) {
+    private synchronized void rollback(final boolean penalize, final boolean 
rollbackCheckpoint) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("{} session rollback called, FlowFile records are {} {}",
                     this, loggableFlowfileInfo(), new Throwable("Stack Trace 
on rollback"));
@@ -1163,6 +1163,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     private void acknowledgeRecords() {
         for (final Map.Entry<FlowFileQueue, Set<FlowFileRecord>> entry : 
unacknowledgedFlowFiles.entrySet()) {
+            LOG.trace("Acknowledging {} for {}", entry.getValue(), 
entry.getKey());
             entry.getKey().acknowledge(entry.getValue());
         }
         unacknowledgedFlowFiles.clear();

http://git-wip-us.apache.org/repos/asf/nifi/blob/af2513ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
index 307ac4f..c56be1b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
@@ -16,23 +16,12 @@
  */
 package org.apache.nifi.web.api;
 
-import javax.servlet.ServletContext;
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.authorization.AuthorizeControllerServiceReference;
 import org.apache.nifi.authorization.Authorizer;
@@ -59,16 +48,25 @@ import 
org.apache.nifi.web.api.entity.PropertyDescriptorEntity;
 import org.apache.nifi.web.api.request.ClientIdParameter;
 import org.apache.nifi.web.api.request.LongParameter;
 
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 import java.util.List;
 import java.util.Set;
 
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiParam;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
-import io.swagger.annotations.Authorization;
-
 /**
  * RESTful endpoint for managing a Processor.
  */
@@ -229,7 +227,7 @@ public class ProcessorResource extends ApplicationResource {
             @ApiParam(value = "The processor id.", required = true) 
@PathParam("id") final String id) {
 
         if (isReplicateRequest()) {
-            return replicate(HttpMethod.POST);
+            return replicate(HttpMethod.DELETE);
         }
 
         final ProcessorEntity requestProcessorEntity = new ProcessorEntity();

http://git-wip-us.apache.org/repos/asf/nifi/blob/af2513ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 85d2876..26bf442 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -1002,7 +1002,10 @@ public final class DtoFactory {
         snapshot.setBytesSent(processGroupStatus.getBytesSent());
         
snapshot.setFlowFilesReceived(processGroupStatus.getFlowFilesReceived());
         snapshot.setBytesReceived(processGroupStatus.getBytesReceived());
+
         
snapshot.setActiveThreadCount(processGroupStatus.getActiveThreadCount());
+        
snapshot.setTerminatedThreadCount(processGroupStatus.getTerminatedThreadCount());
+
         StatusMerger.updatePrettyPrintedFields(snapshot);
         return processGroupStatusDto;
     }
@@ -1167,6 +1170,7 @@ public final class DtoFactory {
         snapshot.setExecutionNode(procStatus.getExecutionNode().toString());
 
         snapshot.setActiveThreadCount(procStatus.getActiveThreadCount());
+        
snapshot.setTerminatedThreadCount(procStatus.getTerminatedThreadCount());
         snapshot.setType(procStatus.getType());
 
         StatusMerger.updatePrettyPrintedFields(snapshot);

http://git-wip-us.apache.org/repos/asf/nifi/blob/af2513ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index 58918ae..9663d3b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -16,10 +16,6 @@
  */
 package org.apache.nifi.web.controller;
 
-import static org.apache.nifi.controller.FlowController.ROOT_GROUP_ID_ALIAS;
-
-import javax.ws.rs.WebApplicationException;
-
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.ClassUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -45,11 +41,11 @@ import org.apache.nifi.controller.ContentAvailability;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.Counter;
 import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.FlowController.GroupStatusCounts;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.Template;
 import org.apache.nifi.controller.label.Label;
-import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.repository.ContentNotFoundException;
 import org.apache.nifi.controller.repository.claim.ContentDirection;
 import org.apache.nifi.controller.service.ControllerServiceNode;
@@ -113,6 +109,7 @@ import 
org.apache.nifi.web.api.entity.ControllerServiceEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.ws.rs.WebApplicationException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.text.Collator;
@@ -136,6 +133,8 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static org.apache.nifi.controller.FlowController.ROOT_GROUP_ID_ALIAS;
+
 public class ControllerFacade implements Authorizable {
 
     private static final Logger logger = 
LoggerFactory.getLogger(ControllerFacade.class);
@@ -572,13 +571,14 @@ public class ControllerFacade implements Authorizable {
      */
     public ControllerStatusDTO getControllerStatus() {
         final ProcessGroup rootGroup = 
flowController.getGroup(flowController.getRootGroupId());
+        final GroupStatusCounts groupStatusCounts = 
flowController.getGroupStatusCounts(rootGroup);
 
-        final QueueSize controllerQueueSize = 
flowController.getTotalFlowFileCount(rootGroup);
         final ControllerStatusDTO controllerStatus = new ControllerStatusDTO();
-        
controllerStatus.setActiveThreadCount(flowController.getActiveThreadCount());
-        
controllerStatus.setQueued(FormatUtils.formatCount(controllerQueueSize.getObjectCount())
 + " / " + FormatUtils.formatDataSize(controllerQueueSize.getByteCount()));
-        controllerStatus.setBytesQueued(controllerQueueSize.getByteCount());
-        
controllerStatus.setFlowFilesQueued(controllerQueueSize.getObjectCount());
+        
controllerStatus.setActiveThreadCount(groupStatusCounts.getActiveThreadCount());
+        
controllerStatus.setTerminatedThreadCount(groupStatusCounts.getTerminatedThreadCount());
+        
controllerStatus.setQueued(FormatUtils.formatCount(groupStatusCounts.getQueuedCount())
 + " / " + 
FormatUtils.formatDataSize(groupStatusCounts.getQueuedContentSize()));
+        
controllerStatus.setBytesQueued(groupStatusCounts.getQueuedContentSize());
+        
controllerStatus.setFlowFilesQueued(groupStatusCounts.getQueuedCount());
 
         final ProcessGroupCounts counts = rootGroup.getCounts();
         controllerStatus.setRunningCount(counts.getRunningCount());

http://git-wip-us.apache.org/repos/asf/nifi/blob/af2513ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/flow-status.jsp
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/flow-status.jsp
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/flow-status.jsp
index 5edfe10..1db3396 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/flow-status.jsp
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/flow-status.jsp
@@ -17,23 +17,52 @@
 <%@ page contentType="text/html" pageEncoding="UTF-8" session="false" %>
 <div id="flow-status" flex layout="row" layout-align="space-between center">
     <div id="flow-status-container" layout="row" layout-align="space-around 
center">
-        <div class="fa fa-cubes" 
ng-if="appCtrl.nf.ClusterSummary.isClustered()" title="Connected nodes / Total 
number of nodes in the cluster"><span 
id="connected-nodes-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.connectedNodesCount}}</span></div>
-        <div class="icon icon-threads" title="Active threads"><span 
id="active-thread-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.activeThreadCount}}</span></div>
-        <div class="fa fa-list" title="Total queued data"><span 
id="total-queued">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.totalQueued}}</span></div>
-        <div class="fa fa-bullseye" title="Transmitting Remote Process 
Groups"><span 
id="controller-transmitting-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerTransmittingCount}}</span></div>
-        <div class="icon icon-transmit-false" title="Not Transmitting Remote 
Process Groups"><span 
id="controller-not-transmitting-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerNotTransmittingCount}}</span></div>
-        <div class="fa fa-play" title="Running Components"><span 
id="controller-running-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerRunningCount}}</span></div>
-        <div class="fa fa-stop" title="Stopped Components"><span 
id="controller-stopped-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerStoppedCount}}</span></div>
-        <div class="fa fa-warning" title="Invalid Components"><span 
id="controller-invalid-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerInvalidCount}}</span></div>
-        <div class="icon icon-enable-false" title="Disabled Components"><span 
id="controller-disabled-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerDisabledCount}}</span></div>
-        <div class="fa fa-check" title="Up to date Versioned Process 
Groups"><span 
id="controller-up-to-date-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerUpToDateCount}}</span></div>
-        <div class="fa fa-asterisk" title="Locally modified Versioned Process 
Groups"><span 
id="controller-locally-modified-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerLocallyModifiedCount}}</span></div>
-        <div class="fa fa-arrow-circle-up" title="Stale Versioned Process 
Groups"><span 
id="controller-stale-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerStaleCount}}</span></div>
+        <div class="fa fa-cubes" 
ng-if="appCtrl.nf.ClusterSummary.isClustered()" title="Connected nodes / Total 
number of nodes in the cluster">
+            <span 
id="connected-nodes-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.connectedNodesCount}}</span>
+        </div>
+        <div class="icon icon-threads" 
ng-class="appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.getExtraThreadStyles()"
+             title="Active 
Threads{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.hasTerminatedThreads()
 ? ' (Terminated)' : ''}}">
+            <span 
id="active-thread-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.threadCounts}}</span>
+        </div>
+        <div class="fa fa-list" title="Total queued data">
+            <span 
id="total-queued">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.totalQueued}}</span>
+        </div>
+        <div class="fa fa-bullseye" title="Transmitting Remote Process Groups">
+            <span 
id="controller-transmitting-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerTransmittingCount}}</span>
+        </div>
+        <div class="icon icon-transmit-false" title="Not Transmitting Remote 
Process Groups">
+            <span 
id="controller-not-transmitting-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerNotTransmittingCount}}</span>
+        </div>
+        <div class="fa fa-play" title="Running Components">
+            <span 
id="controller-running-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerRunningCount}}</span>
+        </div>
+        <div class="fa fa-stop" title="Stopped Components">
+            <span 
id="controller-stopped-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerStoppedCount}}</span>
+        </div>
+        <div class="fa fa-warning" title="Invalid Components">
+            <span 
id="controller-invalid-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerInvalidCount}}</span>
+        </div>
+        <div class="icon icon-enable-false" title="Disabled Components">
+            <span 
id="controller-disabled-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerDisabledCount}}</span>
+        </div>
+        <div class="fa fa-check" title="Up to date Versioned Process Groups">
+            <span 
id="controller-up-to-date-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerUpToDateCount}}</span>
+        </div>
+        <div class="fa fa-asterisk" title="Locally modified Versioned Process 
Groups">
+            <span 
id="controller-locally-modified-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerLocallyModifiedCount}}</span>
+        </div>
+        <div class="fa fa-arrow-circle-up" title="Stale Versioned Process 
Groups">
+            <span 
id="controller-stale-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerStaleCount}}</span>
+        </div>
         <div class="fa fa-exclamation-circle" title="Locally modified and 
stale Versioned Process Groups">
             <span 
id="controller-locally-modified-and-stale-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerLocallyModifiedAndStaleCount}}</span>
         </div>
-        <div class="fa fa-question" title="Sync failure Versioned Process 
Groups"><span 
id="controller-sync-failure-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerSyncFailureCount}}</span></div>
-        <div class="fa fa-refresh" title="Last refresh"><span 
id="stats-last-refreshed">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.statsLastRefreshed}}</span></div>
+        <div class="fa fa-question" title="Sync failure Versioned Process 
Groups">
+            <span 
id="controller-sync-failure-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerSyncFailureCount}}</span>
+        </div>
+        <div class="fa fa-refresh" title="Last refresh">
+            <span 
id="stats-last-refreshed">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.statsLastRefreshed}}</span>
+        </div>
         <div id="canvas-loading-container" class="loading-container"></div>
     </div>
     <div layout="row" layout-align="end center">

http://git-wip-us.apache.org/repos/asf/nifi/blob/af2513ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/main.css
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/main.css
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/main.css
index 2f8dd7b..0cd66c2 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/main.css
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/main.css
@@ -182,7 +182,7 @@ div.valid {
     background-color: transparent;
 }
 
-div.has-bulletins {
+div.has-bulletins, div.warning {
     color: #ba554a !important;
 }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/af2513ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/controllers/nf-ng-canvas-flow-status-controller.js
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/controllers/nf-ng-canvas-flow-status-controller.js
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/controllers/nf-ng-canvas-flow-status-controller.js
index 67b9d10..f301c9a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/controllers/nf-ng-canvas-flow-status-controller.js
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/controllers/nf-ng-canvas-flow-status-controller.js
@@ -67,6 +67,8 @@
         function FlowStatusCtrl() {
             this.connectedNodesCount = "-";
             this.activeThreadCount = "-";
+            this.terminatedThreadCount = "-";
+            this.threadCounts = "-";
             this.totalQueued = "-";
             this.controllerTransmittingCount = "-";
             this.controllerNotTransmittingCount = "-";
@@ -437,26 +439,47 @@
             },
 
             /**
-             * Update the flow status counts.
+             * Returns whether there are any terminated threads.
              *
-             * @param status  The controller status returned from the 
`../nifi-api/flow/status` endpoint.
+             * @returns {boolean} whether there are any terminated threads
              */
-            update: function (status) {
-                var controllerInvalidCount = 
(nfCommon.isDefinedAndNotNull(status.invalidCount)) ? status.invalidCount : 0;
-
-                if (this.controllerInvalidCount > 0) {
-                    
$('#controller-invalid-count').parent().removeClass('zero').addClass('invalid');
+            hasTerminatedThreads: function () {
+                if (Number.isInteger(this.terminatedThreadCount)) {
+                    return this.terminatedThreadCount > 0;
                 } else {
-                    
$('#controller-invalid-count').parent().removeClass('invalid').addClass('zero');
+                    return false;
                 }
+            },
 
+            /**
+             * Returns any additional styles to apply to the thread counts.
+             *
+             * @returns {string}
+             */
+            getExtraThreadStyles: function () {
+                if (Number.isInteger(this.terminatedThreadCount) && 
this.terminatedThreadCount > 0) {
+                    return 'warning';
+                } else if (this.activeThreadCount === 0) {
+                    return 'zero';
+                }
+
+                return '';
+            },
+
+            /**
+             * Update the flow status counts.
+             *
+             * @param status  The controller status returned from the 
`../nifi-api/flow/status` endpoint.
+             */
+            update: function (status) {
                 // update the report values
                 this.activeThreadCount = status.activeThreadCount;
+                this.terminatedThreadCount = status.terminatedThreadCount;
 
-                if (this.activeThreadCount > 0) {
-                    
$('#flow-status-container').find('.icon-threads').removeClass('zero');
+                if (this.hasTerminatedThreads()) {
+                    this.threadCounts = this.activeThreadCount + ' (' + 
this.terminatedThreadCount + ')';
                 } else {
-                    
$('#flow-status-container').find('.icon-threads').addClass('zero');
+                    this.threadCounts = this.activeThreadCount;
                 }
 
                 this.totalQueued = status.queued;

http://git-wip-us.apache.org/repos/asf/nifi/blob/af2513ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
index 41e945b..b8587cd 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
@@ -767,6 +767,25 @@
         },
 
         /**
+         * Terminates active threads for the selected component.
+         *
+         * @param {selection} selection
+         */
+        terminate: function (selection) {
+            if (selection.size() === 1 && 
nfCanvasUtils.isProcessor(selection)) {
+                var selectionData = selection.datum();
+
+                $.ajax({
+                    type: 'DELETE',
+                    url: selectionData.uri + '/threads',
+                    dataType: 'json'
+                }).done(function (response) {
+                    nfProcessor.set(response);
+                }).fail(nfErrorHandler.handleAjaxError);
+            }
+        },
+
+        /**
          * Enables transmission for the components in the specified selection.
          *
          * @argument {selection} selection      The selection

http://git-wip-us.apache.org/repos/asf/nifi/blob/af2513ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-utils.js
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-utils.js
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-utils.js
index d6785bb..0b8fd82 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-utils.js
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-utils.js
@@ -772,21 +772,45 @@
          * @return
          */
         activeThreadCount: function (selection, d, setOffset) {
+            var activeThreads = d.status.aggregateSnapshot.activeThreadCount;
+            var terminatedThreads = 
d.status.aggregateSnapshot.terminatedThreadCount;
+
             // if there is active threads show the count, otherwise hide
-            if (d.status.aggregateSnapshot.activeThreadCount > 0) {
+            if (activeThreads > 0 || terminatedThreads > 0) {
+                var generateThreadsTip = function () {
+                    var tip = activeThreads + ' active threads';
+                    if (terminatedThreads > 0) {
+                        tip += ' (' + terminatedThreads + ' terminated)';
+                    }
+
+                    return tip;
+                };
+
                 // update the active thread count
                 var activeThreadCount = 
selection.select('text.active-thread-count')
                     .text(function () {
-                        return d.status.aggregateSnapshot.activeThreadCount;
+                        if (terminatedThreads > 0) {
+                            return activeThreads + ' (' + terminatedThreads + 
')';
+                        } else {
+                            return activeThreads;
+                        }
                     })
                     .style('display', 'block')
                     .each(function () {
+                        var activeThreadCountText = d3.select(this);
+
                         var bBox = this.getBBox();
-                        d3.select(this).attr('x', function () {
+                        activeThreadCountText.attr('x', function () {
                             return d.dimensions.width - bBox.width - 15;
                         });
+
+                        // reset the active thread count tooltip
+                        activeThreadCountText.selectAll('title').remove();
                     });
 
+                // append the tooltip
+                activeThreadCount.append('title').text(generateThreadsTip);
+
                 // update the background width
                 selection.select('text.active-thread-count-icon')
                     .attr('x', function () {
@@ -799,9 +823,26 @@
 
                         return d.dimensions.width - bBox.width - 20;
                     })
-                    .style('display', 'block');
+                    .style('fill', function () {
+                        if (terminatedThreads > 0) {
+                            return '#ba554a';
+                        } else {
+                            return '#728e9b';
+                        }
+                    })
+                    .style('display', 'block')
+                    .each(function () {
+                        var activeThreadCountIcon = d3.select(this);
+
+                        // reset the active thread count tooltip
+                        activeThreadCountIcon.selectAll('title').remove();
+                    }).append('title').text(generateThreadsTip);
             } else {
-                selection.selectAll('text.active-thread-count, 
text.active-thread-count-icon').style('display', 'none');
+                selection.selectAll('text.active-thread-count, 
text.active-thread-count-icon')
+                    .style('display', 'none')
+                    .each(function () {
+                        d3.select(this).selectAll('title').remove();
+                    });
             }
         },
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/af2513ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js
index 5571e08..508651d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js
@@ -164,6 +164,30 @@
     };
 
     /**
+     * Determines whether the components in the specified selection can be 
terminated.
+     *
+     * @param {selection} selection         The selections of currently 
selected components
+     */
+    var canTerminate = function (selection) {
+        if (selection.size() !== 1) {
+            return false;
+        }
+
+        if (nfCanvasUtils.canModify(selection) === false) {
+            return false;
+        }
+
+        var terminatable = false;
+        if (nfCanvasUtils.isProcessor(selection)) {
+            var selectionData = selection.datum();
+            var aggregateSnapshot = selectionData.status.aggregateSnapshot;
+            terminatable = aggregateSnapshot.runStatus !== 'Running' && 
aggregateSnapshot.activeThreadCount > 0;
+        }
+
+        return terminatable;
+    };
+
+    /**
      * Determines whether the components in the specified selection support 
stats.
      *
      * @param {selection} selection         The selection of currently 
selected components
@@ -771,6 +795,7 @@
         {separator: true},
         {id: 'start-menu-item', condition: isRunnable, menuItem: {clazz: 'fa 
fa-play', text: 'Start', action: 'start'}},
         {id: 'stop-menu-item', condition: isStoppable, menuItem: {clazz: 'fa 
fa-stop', text: 'Stop', action: 'stop'}},
+        {id: 'terminate-menu-item', condition: canTerminate, menuItem: {clazz: 
'fa fa-hourglass-end', text: 'Terminate', action: 'terminate'}},
         {id: 'enable-menu-item', condition: canEnable, menuItem: {clazz: 'fa 
fa-flash', text: 'Enable', action: 'enable'}},
         {id: 'disable-menu-item', condition: canDisable, menuItem: {clazz: 
'icon icon-enable-false', text: 'Disable', action: 'disable'}},
         {id: 'enable-transmission-menu-item', condition: canStartTransmission, 
menuItem: {clazz: 'fa fa-bullseye', text: 'Enable transmission', action: 
'enableTransmission'}},

http://git-wip-us.apache.org/repos/asf/nifi/blob/af2513ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-process-group.js
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-process-group.js
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-process-group.js
index dd147c4..90982f6 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-process-group.js
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-process-group.js
@@ -172,7 +172,7 @@
             .attrs({
                 'x': 10,
                 'y': 20,
-                'width': 316,
+                'width': 300,
                 'height': 16,
                 'class': 'process-group-name'
             });
@@ -1214,9 +1214,9 @@
                                 if (isUnderVersionControl(processGroupData)) {
                                     var versionControlX = 
parseInt(versionControl.attr('x'), 10);
                                     var processGroupNameX = 
parseInt(d3.select(this).attr('x'), 10);
-                                    return 316 - (processGroupNameX - 
versionControlX);
+                                    return 300 - (processGroupNameX - 
versionControlX);
                                 } else {
-                                    return 316;
+                                    return 300;
                                 }
                             }
                         })

http://git-wip-us.apache.org/repos/asf/nifi/blob/af2513ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-processor.js
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-processor.js
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-processor.js
index b30c981..71d6de9 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-processor.js
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-processor.js
@@ -148,7 +148,7 @@
             .attrs({
                 'x': 75,
                 'y': 18,
-                'width': 210,
+                'width': 230,
                 'height': 14,
                 'class': 'processor-name'
             });
@@ -278,7 +278,7 @@
                             'class': 'processor-bundle',
                             'x': 75,
                             'y': 45,
-                            'width': 230,
+                            'width': 200,
                             'height': 12
                         });
 
@@ -547,7 +547,7 @@
                     details.append('text')
                         .attrs({
                             'class': 'active-thread-count-icon',
-                            'y': 45
+                            'y': 46
                         })
                         .text('\ue83f');
 
@@ -555,7 +555,7 @@
                     details.append('text')
                         .attrs({
                             'class': 'active-thread-count',
-                            'y': 45
+                            'y': 46
                         });
 
                     // ---------

http://git-wip-us.apache.org/repos/asf/nifi/blob/af2513ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/summary/nf-summary-table.js
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/summary/nf-summary-table.js
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/summary/nf-summary-table.js
index e54b441..e1311b4 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/summary/nf-summary-table.js
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/summary/nf-summary-table.js
@@ -306,9 +306,14 @@
 
         // define a custom formatter for the run status column
         var runStatusFormatter = function (row, cell, value, columnDef, 
dataContext) {
-            var activeThreadCount = '';
-            if (nfCommon.isDefinedAndNotNull(dataContext.activeThreadCount) && 
dataContext.activeThreadCount > 0) {
-                activeThreadCount = '(' + 
nfCommon.escapeHtml(dataContext.activeThreadCount) + ')';
+            var threadCounts = '';
+            var threadTip = '';
+            if (dataContext.terminatedThreadCount > 0) {
+                threadCounts = '(' + dataContext.activeThreadCount + ' / ' + 
dataContext.terminatedThreadCount + ')';
+                threadTip = 'Threads: (Active / Terminated)';
+            } else if (dataContext.activeThreadCount > 0) {
+                threadCounts = '(' + dataContext.activeThreadCount + ')';
+                threadTip = 'Active Threads';
             }
             var classes;
             switch (value.toLowerCase()) {
@@ -330,8 +335,20 @@
                 default:
                     classes = '';
             }
-            var formattedValue = '<div layout="row"><div class="' + classes + 
'"></div>';
-            return formattedValue + '<div class="status-text" 
style="margin-top: 4px;">' + nfCommon.escapeHtml(value) + '</div><div 
style="float: left; margin-left: 4px;">' + 
nfCommon.escapeHtml(activeThreadCount) + '</div></div>';
+
+
+            var markup =
+                '<div layout="row">' +
+                    '<div class="' + classes + '"></div>' +
+                    '<div class="status-text" style="margin-top: 4px;">' +
+                        nfCommon.escapeHtml(value) +
+                    '</div>' +
+                    '<div style="float: left; margin-left: 4px;" title="' + 
threadTip + '">' +
+                        nfCommon.escapeHtml(threadCounts) +
+                    '</div>' +
+                '</div>';
+
+            return markup;
         };
 
         // define the input, read, written, and output columns (reused between 
both tables)
@@ -2694,6 +2711,7 @@
                         node: nodeSnapshot.address + ':' + 
nodeSnapshot.apiPort,
                         runStatus: snapshot.runStatus,
                         activeThreadCount: snapshot.activeThreadCount,
+                        terminatedThreadCount: snapshot.terminatedThreadCount,
                         input: snapshot.input,
                         read: snapshot.read,
                         written: snapshot.written,

Reply via email to