http://git-wip-us.apache.org/repos/asf/nifi/blob/ad32cb82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java
new file mode 100644
index 0000000..89e8aa0
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.status.history;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
+
+public enum ProcessorStatusDescriptor {
+    BYTES_READ(new StandardMetricDescriptor<ProcessorStatus>(
+        "bytesRead",
+        "Bytes Read (5 mins)",
+        "The total number of bytes read from the Content Repository by this 
Processor in the past 5 minutes",
+        Formatter.DATA_SIZE,
+        new ValueMapper<ProcessorStatus>() {
+            @Override
+            public Long getValue(final ProcessorStatus status) {
+                return status.getBytesRead();
+            }
+        })),
+
+    BYTES_WRITTEN(new StandardMetricDescriptor<ProcessorStatus>(
+        "bytesWritten",
+        "Bytes Written (5 mins)",
+        "The total number of bytes written to the Content Repository by this 
Processor in the past 5 minutes",
+        Formatter.DATA_SIZE,
+        new ValueMapper<ProcessorStatus>() {
+            @Override
+            public Long getValue(final ProcessorStatus status) {
+                return status.getBytesWritten();
+            }
+        })),
+
+    BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessorStatus>(
+        "bytesTransferred",
+        "Bytes Transferred (5 mins)",
+        "The total number of bytes read from or written to the Content 
Repository by this Processor in the past 5 minutes",
+        Formatter.DATA_SIZE,
+        new ValueMapper<ProcessorStatus>() {
+            @Override
+            public Long getValue(final ProcessorStatus status) {
+                return status.getBytesRead() + status.getBytesWritten();
+            }
+        })),
+
+    INPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>(
+        "inputBytes",
+        "Bytes In (5 mins)",
+        "The cumulative size of all FlowFiles that this Processor has pulled 
from its queues in the past 5 minutes",
+        Formatter.DATA_SIZE,
+        new ValueMapper<ProcessorStatus>() {
+            @Override
+            public Long getValue(final ProcessorStatus status) {
+                return status.getInputBytes();
+            }
+        })),
+
+    INPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>(
+        "inputCount",
+        "FlowFiles In (5 mins)",
+        "The number of FlowFiles that this Processor has pulled from its 
queues in the past 5 minutes",
+        Formatter.COUNT, new ValueMapper<ProcessorStatus>() {
+            @Override
+            public Long getValue(final ProcessorStatus status) {
+                return Long.valueOf(status.getInputCount());
+            }
+        })),
+    OUTPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>(
+        "outputBytes",
+        "Bytes Out (5 mins)",
+        "The cumulative size of all FlowFiles that this Processor has 
transferred to downstream queues in the past 5 minutes",
+        Formatter.DATA_SIZE,
+        new ValueMapper<ProcessorStatus>() {
+            @Override
+            public Long getValue(final ProcessorStatus status) {
+                return status.getOutputBytes();
+            }
+        })),
+
+    OUTPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>(
+        "outputCount",
+        "FlowFiles Out (5 mins)",
+        "The number of FlowFiles that this Processor has transferred to 
downstream queues in the past 5 minutes",
+        Formatter.COUNT,
+        new ValueMapper<ProcessorStatus>() {
+            @Override
+            public Long getValue(final ProcessorStatus status) {
+                return Long.valueOf(status.getOutputCount());
+            }
+        })),
+
+    TASK_COUNT(new StandardMetricDescriptor<ProcessorStatus>(
+        "taskCount",
+        "Tasks (5 mins)",
+        "The number of tasks that this Processor has completed in the past 5 
minutes",
+        Formatter.COUNT,
+        new ValueMapper<ProcessorStatus>() {
+            @Override
+            public Long getValue(final ProcessorStatus status) {
+                return Long.valueOf(status.getInvocations());
+            }
+        })),
+
+    TASK_MILLIS(new StandardMetricDescriptor<ProcessorStatus>(
+        "taskMillis",
+        "Total Task Duration (5 mins)",
+        "The total number of thread-milliseconds that the Processor has used 
to complete its tasks in the past 5 minutes",
+        Formatter.DURATION,
+        new ValueMapper<ProcessorStatus>() {
+            @Override
+            public Long getValue(final ProcessorStatus status) {
+                return 
TimeUnit.MILLISECONDS.convert(status.getProcessingNanos(), 
TimeUnit.NANOSECONDS);
+            }
+        })),
+
+    FLOWFILES_REMOVED(new StandardMetricDescriptor<ProcessorStatus>(
+        "flowFilesRemoved",
+        "FlowFiles Removed (5 mins)",
+        "The total number of FlowFiles removed by this Processor in the last 5 
minutes",
+        Formatter.COUNT,
+        new ValueMapper<ProcessorStatus>() {
+            @Override
+            public Long getValue(final ProcessorStatus status) {
+                return Long.valueOf(status.getFlowFilesRemoved());
+            }
+        })),
+
+    AVERAGE_LINEAGE_DURATION(new StandardMetricDescriptor<ProcessorStatus>(
+        "averageLineageDuration",
+        "Average Lineage Duration (5 mins)",
+        "The average amount of time that a FlowFile took to process (from 
receipt until this Processor finished processing it) in the past 5 minutes.",
+        Formatter.DURATION,
+        new ValueMapper<ProcessorStatus>() {
+            @Override
+            public Long getValue(final ProcessorStatus status) {
+                return status.getAverageLineageDuration(TimeUnit.MILLISECONDS);
+            }
+        }, new ValueReducer<StatusSnapshot, Long>() {
+            @Override
+            public Long reduce(final List<StatusSnapshot> values) {
+                long millis = 0L;
+                int count = 0;
+
+                for (final StatusSnapshot snapshot : values) {
+                    final long removed = 
snapshot.getStatusMetrics().get(FLOWFILES_REMOVED.getDescriptor()).longValue();
+                    count += removed;
+
+                    count += 
snapshot.getStatusMetrics().get(OUTPUT_COUNT.getDescriptor()).longValue();
+
+                    final long avgMillis = 
snapshot.getStatusMetrics().get(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue();
+                    final long totalMillis = avgMillis * removed;
+                    millis += totalMillis;
+                }
+
+                return count == 0 ? 0 : millis / count;
+            }
+        }
+    )),
+
+    AVERAGE_TASK_MILLIS(new StandardMetricDescriptor<ProcessorStatus>(
+        "averageTaskMillis",
+        "Average Task Duration",
+        "The average duration it took this Processor to complete a task, as 
averaged over the past 5 minutes",
+        Formatter.DURATION,
+        new ValueMapper<ProcessorStatus>() {
+            @Override
+            public Long getValue(final ProcessorStatus status) {
+                return status.getInvocations() == 0 ? 0 : 
TimeUnit.MILLISECONDS.convert(status.getProcessingNanos(), 
TimeUnit.NANOSECONDS) / status.getInvocations();
+            }
+        },
+        new ValueReducer<StatusSnapshot, Long>() {
+            @Override
+            public Long reduce(final List<StatusSnapshot> values) {
+                long procMillis = 0L;
+                int invocations = 0;
+
+                for (final StatusSnapshot snapshot : values) {
+                    procMillis += 
snapshot.getStatusMetrics().get(TASK_MILLIS.getDescriptor()).longValue();
+                    invocations += 
snapshot.getStatusMetrics().get(TASK_COUNT.getDescriptor()).intValue();
+                }
+
+                if (invocations == 0) {
+                    return 0L;
+                }
+
+                return procMillis / invocations;
+            }
+        }));
+
+    private MetricDescriptor<ProcessorStatus> descriptor;
+
+    private ProcessorStatusDescriptor(final MetricDescriptor<ProcessorStatus> 
descriptor) {
+        this.descriptor = descriptor;
+    }
+
+    public String getField() {
+        return descriptor.getField();
+    }
+
+    public MetricDescriptor<ProcessorStatus> getDescriptor() {
+        return descriptor;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/ad32cb82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java
new file mode 100644
index 0000000..0499d65
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.status.history;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
+
+public enum RemoteProcessGroupStatusDescriptor {
+    SENT_BYTES(new 
StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytes", "Bytes Sent (5 
mins)",
+        "The cumulative size of all FlowFiles that have been successfully sent 
to the remote system in the past 5 minutes", Formatter.DATA_SIZE, new 
ValueMapper<RemoteProcessGroupStatus>() {
+            @Override
+            public Long getValue(final RemoteProcessGroupStatus status) {
+                return status.getSentContentSize();
+            }
+        })),
+
+    SENT_COUNT(new 
StandardMetricDescriptor<RemoteProcessGroupStatus>("sentCount", "FlowFiles Sent 
(5 mins)",
+        "The number of FlowFiles that have been successfully sent to the 
remote system in the past 5 minutes", Formatter.COUNT, new 
ValueMapper<RemoteProcessGroupStatus>() {
+            @Override
+            public Long getValue(final RemoteProcessGroupStatus status) {
+                return Long.valueOf(status.getSentCount().longValue());
+            }
+        })),
+
+    RECEIVED_BYTES(new 
StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytes", "Bytes 
Received (5 mins)",
+        "The cumulative size of all FlowFiles that have been received from the 
remote system in the past 5 minutes", Formatter.DATA_SIZE, new 
ValueMapper<RemoteProcessGroupStatus>() {
+            @Override
+            public Long getValue(final RemoteProcessGroupStatus status) {
+                return status.getReceivedContentSize();
+            }
+        })),
+
+    RECEIVED_COUNT(new 
StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedCount", "FlowFiles 
Received (5 mins)",
+        "The number of FlowFiles that have been received from the remote 
system in the past 5 minutes", Formatter.COUNT, new 
ValueMapper<RemoteProcessGroupStatus>() {
+            @Override
+            public Long getValue(final RemoteProcessGroupStatus status) {
+                return Long.valueOf(status.getReceivedCount().longValue());
+            }
+        })),
+
+    RECEIVED_BYTES_PER_SECOND(new 
StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytesPerSecond", 
"Received Bytes Per Second",
+        "The data rate at which data was received from the remote system in 
the past 5 minutes in terms of Bytes Per Second",
+        Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
+            @Override
+            public Long getValue(final RemoteProcessGroupStatus status) {
+                return 
Long.valueOf(status.getReceivedContentSize().longValue() / 300L);
+            }
+        })),
+
+    SENT_BYTES_PER_SECOND(new 
StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytesPerSecond", "Sent 
Bytes Per Second",
+        "The data rate at which data was received from the remote system in 
the past 5 minutes in terms of Bytes Per Second", Formatter.DATA_SIZE, new 
ValueMapper<RemoteProcessGroupStatus>() {
+            @Override
+            public Long getValue(final RemoteProcessGroupStatus status) {
+                return Long.valueOf(status.getSentContentSize().longValue() / 
300L);
+            }
+        })),
+
+    TOTAL_BYTES_PER_SECOND(new 
StandardMetricDescriptor<RemoteProcessGroupStatus>("totalBytesPerSecond", 
"Total Bytes Per Second",
+        "The sum of the send and receive data rate from the remote system in 
the past 5 minutes in terms of Bytes Per Second",
+        Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
+            @Override
+            public Long getValue(final RemoteProcessGroupStatus status) {
+                return 
Long.valueOf((status.getReceivedContentSize().longValue() + 
status.getSentContentSize().longValue()) / 300L);
+            }
+        })),
+
+    AVERAGE_LINEAGE_DURATION(new 
StandardMetricDescriptor<RemoteProcessGroupStatus>(
+        "averageLineageDuration",
+        "Average Lineage Duration (5 mins)",
+        "The average amount of time that a FlowFile took to process from 
receipt to drop in the past 5 minutes. For Processors that do not terminate 
FlowFiles, this value will be 0.",
+        Formatter.DURATION,
+        new ValueMapper<RemoteProcessGroupStatus>() {
+            @Override
+            public Long getValue(final RemoteProcessGroupStatus status) {
+                return status.getAverageLineageDuration(TimeUnit.MILLISECONDS);
+            }
+        }, new ValueReducer<StatusSnapshot, Long>() {
+            @Override
+            public Long reduce(final List<StatusSnapshot> values) {
+                long millis = 0L;
+                int count = 0;
+
+                for (final StatusSnapshot snapshot : values) {
+                    final long sent = 
snapshot.getStatusMetrics().get(SENT_COUNT.getDescriptor()).longValue();
+                    count += sent;
+
+                    final long avgMillis = 
snapshot.getStatusMetrics().get(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue();
+                    final long totalMillis = avgMillis * sent;
+                    millis += totalMillis;
+                }
+
+                return count == 0 ? 0 : millis / count;
+            }
+        }));
+
+    private final MetricDescriptor<RemoteProcessGroupStatus> descriptor;
+
+    private RemoteProcessGroupStatusDescriptor(final 
MetricDescriptor<RemoteProcessGroupStatus> descriptor) {
+        this.descriptor = descriptor;
+    }
+
+    public String getField() {
+        return descriptor.getField();
+    }
+
+    public MetricDescriptor<RemoteProcessGroupStatus> getDescriptor() {
+        return descriptor;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/ad32cb82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java
index 014b0a6..2b0cd9e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java
@@ -44,9 +44,9 @@ public class StatusHistoryUtil {
 
         final StatusHistoryDTO dto = new StatusHistoryDTO();
         dto.setGenerated(new Date());
-        dto.setDetails(componentDetails);
+        dto.setComponentDetails(componentDetails);
         
dto.setFieldDescriptors(StatusHistoryUtil.createFieldDescriptorDtos(metricDescriptors));
-        dto.setStatusSnapshots(snapshotDtos);
+        dto.setAggregateStatusSnapshots(snapshotDtos);
         return dto;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/ad32cb82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
index 02f7d6b..36288d5 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
@@ -20,19 +20,16 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.controller.status.ConnectionStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.controller.status.ProcessorStatus;
 import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
-import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
 import org.apache.nifi.util.ComponentStatusReport;
-import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.ComponentStatusReport.ComponentType;
+import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.RingBuffer;
 import org.apache.nifi.util.RingBuffer.ForEachEvaluator;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -233,22 +230,9 @@ public class VolatileComponentStatusRepository implements 
ComponentStatusReposit
         return history;
     }
 
-    private static long calculateTaskMillis(final ProcessGroupStatus status) {
-        long nanos = 0L;
 
-        for (final ProcessorStatus procStatus : status.getProcessorStatus()) {
-            nanos += procStatus.getProcessingNanos();
-        }
-
-        for (final ProcessGroupStatus childStatus : 
status.getProcessGroupStatus()) {
-            nanos += calculateTaskMillis(childStatus);
-        }
-
-        return TimeUnit.MILLISECONDS.convert(nanos, TimeUnit.NANOSECONDS);
-    }
 
     private static class Capture {
-
         private final Date captureDate;
         private final ComponentStatusReport statusReport;
 
@@ -266,407 +250,7 @@ public class VolatileComponentStatusRepository implements 
ComponentStatusReposit
         }
     }
 
-    public static enum RemoteProcessGroupStatusDescriptor {
-
-        SENT_BYTES(new 
StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytes", "Bytes Sent (5 
mins)",
-                "The cumulative size of all FlowFiles that have been 
successfully sent to the remote system in the past 5 minutes", 
Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
-                    @Override
-                    public Long getValue(final RemoteProcessGroupStatus 
status) {
-                        return status.getSentContentSize();
-                    }
-                })),
-        SENT_COUNT(new 
StandardMetricDescriptor<RemoteProcessGroupStatus>("sentCount", "FlowFiles Sent 
(5 mins)",
-                "The number of FlowFiles that have been successfully sent to 
the remote system in the past 5 minutes", Formatter.COUNT, new 
ValueMapper<RemoteProcessGroupStatus>() {
-                    @Override
-                    public Long getValue(final RemoteProcessGroupStatus 
status) {
-                        return Long.valueOf(status.getSentCount().longValue());
-                    }
-                })),
-        RECEIVED_BYTES(new 
StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytes", "Bytes 
Received (5 mins)",
-                "The cumulative size of all FlowFiles that have been received 
from the remote system in the past 5 minutes", Formatter.DATA_SIZE, new 
ValueMapper<RemoteProcessGroupStatus>() {
-                    @Override
-                    public Long getValue(final RemoteProcessGroupStatus 
status) {
-                        return status.getReceivedContentSize();
-                    }
-                })),
-        RECEIVED_COUNT(new 
StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedCount", "FlowFiles 
Received (5 mins)",
-                "The number of FlowFiles that have been received from the 
remote system in the past 5 minutes", Formatter.COUNT, new 
ValueMapper<RemoteProcessGroupStatus>() {
-                    @Override
-                    public Long getValue(final RemoteProcessGroupStatus 
status) {
-                        return 
Long.valueOf(status.getReceivedCount().longValue());
-                    }
-                })),
-        RECEIVED_BYTES_PER_SECOND(new 
StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytesPerSecond", 
"Received Bytes Per Second",
-                "The data rate at which data was received from the remote 
system in the past 5 minutes in terms of Bytes Per Second",
-                Formatter.DATA_SIZE, new 
ValueMapper<RemoteProcessGroupStatus>() {
-                    @Override
-                    public Long getValue(final RemoteProcessGroupStatus 
status) {
-                        return 
Long.valueOf(status.getReceivedContentSize().longValue() / 300L);
-                    }
-                })),
-        SENT_BYTES_PER_SECOND(new 
StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytesPerSecond", "Sent 
Bytes Per Second",
-                "The data rate at which data was received from the remote 
system in the past 5 minutes in terms of Bytes Per Second", 
Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
-                    @Override
-                    public Long getValue(final RemoteProcessGroupStatus 
status) {
-                        return 
Long.valueOf(status.getSentContentSize().longValue() / 300L);
-                    }
-                })),
-        TOTAL_BYTES_PER_SECOND(new 
StandardMetricDescriptor<RemoteProcessGroupStatus>("totalBytesPerSecond", 
"Total Bytes Per Second",
-                "The sum of the send and receive data rate from the remote 
system in the past 5 minutes in terms of Bytes Per Second",
-                Formatter.DATA_SIZE, new 
ValueMapper<RemoteProcessGroupStatus>() {
-                    @Override
-                    public Long getValue(final RemoteProcessGroupStatus 
status) {
-                        return 
Long.valueOf((status.getReceivedContentSize().longValue() + 
status.getSentContentSize().longValue()) / 300L);
-                    }
-                })),
-        AVERAGE_LINEAGE_DURATION(new 
StandardMetricDescriptor<RemoteProcessGroupStatus>(
-                "averageLineageDuration",
-                "Average Lineage Duration (5 mins)",
-                "The average amount of time that a FlowFile took to process 
from receipt to drop in the past 5 minutes. For Processors that do not 
terminate FlowFiles, this value will be 0.",
-                Formatter.DURATION,
-                new ValueMapper<RemoteProcessGroupStatus>() {
-                    @Override
-                    public Long getValue(final RemoteProcessGroupStatus 
status) {
-                        return 
status.getAverageLineageDuration(TimeUnit.MILLISECONDS);
-                    }
-                }, new ValueReducer<StatusSnapshot, Long>() {
-                    @Override
-                    public Long reduce(final List<StatusSnapshot> values) {
-                        long millis = 0L;
-                        int count = 0;
-
-                        for (final StatusSnapshot snapshot : values) {
-                            final long sent = 
snapshot.getStatusMetrics().get(SENT_COUNT.getDescriptor()).longValue();
-                            count += sent;
-
-                            final long avgMillis = 
snapshot.getStatusMetrics().get(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue();
-                            final long totalMillis = avgMillis * sent;
-                            millis += totalMillis;
-                        }
-
-                        return count == 0 ? 0 : millis / count;
-                    }
-                }
-        ));
-
-        private final MetricDescriptor<RemoteProcessGroupStatus> descriptor;
-
-        private RemoteProcessGroupStatusDescriptor(final 
MetricDescriptor<RemoteProcessGroupStatus> descriptor) {
-            this.descriptor = descriptor;
-        }
-
-        public String getField() {
-            return descriptor.getField();
-        }
-
-        public MetricDescriptor<RemoteProcessGroupStatus> getDescriptor() {
-            return descriptor;
-        }
-    }
-
-    public static enum ProcessGroupStatusDescriptor {
-
-        BYTES_READ(new 
StandardMetricDescriptor<ProcessGroupStatus>("bytesRead", "Bytes Read (5 mins)",
-                "The total number of bytes read from Content Repository by 
Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, 
new ValueMapper<ProcessGroupStatus>() {
-                    @Override
-                    public Long getValue(final ProcessGroupStatus status) {
-                        return status.getBytesRead();
-                    }
-                })),
-        BYTES_WRITTEN(new 
StandardMetricDescriptor<ProcessGroupStatus>("bytesWritten", "Bytes Written (5 
mins)",
-                "The total number of bytes written to Content Repository by 
Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, 
new ValueMapper<ProcessGroupStatus>() {
-                    @Override
-                    public Long getValue(final ProcessGroupStatus status) {
-                        return status.getBytesWritten();
-                    }
-                })),
-        BYTES_TRANSFERRED(new 
StandardMetricDescriptor<ProcessGroupStatus>("bytesTransferred", "Bytes 
Transferred (5 mins)",
-                "The total number of bytes read from or written to Content 
Repository by Processors in this Process Group in the past 5 minutes",
-                Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
-                    @Override
-                    public Long getValue(final ProcessGroupStatus status) {
-                        return status.getBytesRead() + 
status.getBytesWritten();
-                    }
-                })),
-        INPUT_BYTES(new 
StandardMetricDescriptor<ProcessGroupStatus>("inputBytes", "Bytes In (5 mins)",
-                "The cumulative size of all FlowFiles that have entered this 
Process Group via its Input Ports in the past 5 minutes",
-                Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
-                    @Override
-                    public Long getValue(final ProcessGroupStatus status) {
-                        return status.getInputContentSize();
-                    }
-                })),
-        INPUT_COUNT(new 
StandardMetricDescriptor<ProcessGroupStatus>("inputCount", "FlowFiles In (5 
mins)",
-                "The number of FlowFiles that have entered this Process Group 
via its Input Ports in the past 5 minutes",
-                Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() {
-                    @Override
-                    public Long getValue(final ProcessGroupStatus status) {
-                        return status.getInputCount().longValue();
-                    }
-                })),
-        OUTPUT_BYTES(new 
StandardMetricDescriptor<ProcessGroupStatus>("outputBytes", "Bytes Out (5 
mins)",
-                "The cumulative size of all FlowFiles that have exited this 
Process Group via its Output Ports in the past 5 minutes",
-                Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
-                    @Override
-                    public Long getValue(final ProcessGroupStatus status) {
-                        return status.getOutputContentSize();
-                    }
-                })),
-        OUTPUT_COUNT(new 
StandardMetricDescriptor<ProcessGroupStatus>("outputCount", "FlowFiles Out (5 
mins)",
-                "The number of FlowFiles that have exited this Process Group 
via its Output Ports in the past 5 minutes",
-                Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() {
-                    @Override
-                    public Long getValue(final ProcessGroupStatus status) {
-                        return status.getOutputCount().longValue();
-                    }
-                })),
-        QUEUED_BYTES(new 
StandardMetricDescriptor<ProcessGroupStatus>("queuedBytes", "Queued Bytes",
-                "The cumulative size of all FlowFiles queued in all 
Connections of this Process Group",
-                Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
-                    @Override
-                    public Long getValue(final ProcessGroupStatus status) {
-                        return status.getQueuedContentSize();
-                    }
-                })),
-        QUEUED_COUNT(new 
StandardMetricDescriptor<ProcessGroupStatus>("queuedCount", "Queued Count",
-                "The number of FlowFiles queued in all Connections of this 
Process Group", Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() {
-                    @Override
-                    public Long getValue(final ProcessGroupStatus status) {
-                        return status.getQueuedCount().longValue();
-                    }
-                })),
-        TASK_MILLIS(new 
StandardMetricDescriptor<ProcessGroupStatus>("taskMillis", "Total Task Duration 
(5 mins)",
-                "The total number of thread-milliseconds that the Processors 
within this ProcessGroup have used to complete their tasks in the past 5 
minutes",
-                Formatter.DURATION, new ValueMapper<ProcessGroupStatus>() {
-                    @Override
-                    public Long getValue(final ProcessGroupStatus status) {
-                        return calculateTaskMillis(status);
-                    }
-                }));
-
-        private MetricDescriptor<ProcessGroupStatus> descriptor;
-
-        private ProcessGroupStatusDescriptor(final 
MetricDescriptor<ProcessGroupStatus> descriptor) {
-            this.descriptor = descriptor;
-        }
-
-        public String getField() {
-            return descriptor.getField();
-        }
-
-        public MetricDescriptor<ProcessGroupStatus> getDescriptor() {
-            return descriptor;
-        }
-    }
-
-    public static enum ConnectionStatusDescriptor {
-
-        INPUT_BYTES(new 
StandardMetricDescriptor<ConnectionStatus>("inputBytes", "Bytes In (5 mins)",
-                "The cumulative size of all FlowFiles that were transferred to 
this Connection in the past 5 minutes", Formatter.DATA_SIZE, new 
ValueMapper<ConnectionStatus>() {
-                    @Override
-                    public Long getValue(final ConnectionStatus status) {
-                        return status.getInputBytes();
-                    }
-                })),
-        INPUT_COUNT(new 
StandardMetricDescriptor<ConnectionStatus>("inputCount", "FlowFiles In (5 
mins)",
-                "The number of FlowFiles that were transferred to this 
Connection in the past 5 minutes", Formatter.COUNT, new 
ValueMapper<ConnectionStatus>() {
-                    @Override
-                    public Long getValue(final ConnectionStatus status) {
-                        return Long.valueOf(status.getInputCount());
-                    }
-                })),
-        OUTPUT_BYTES(new 
StandardMetricDescriptor<ConnectionStatus>("outputBytes", "Bytes Out (5 mins)",
-                "The cumulative size of all FlowFiles that were pulled from 
this Connection in the past 5 minutes", Formatter.DATA_SIZE, new 
ValueMapper<ConnectionStatus>() {
-                    @Override
-                    public Long getValue(final ConnectionStatus status) {
-                        return status.getOutputBytes();
-                    }
-                })),
-        OUTPUT_COUNT(new 
StandardMetricDescriptor<ConnectionStatus>("outputCount", "FlowFiles Out (5 
mins)",
-                "The number of FlowFiles that were pulled from this Connection 
in the past 5 minutes", Formatter.COUNT, new ValueMapper<ConnectionStatus>() {
-                    @Override
-                    public Long getValue(final ConnectionStatus status) {
-                        return Long.valueOf(status.getOutputCount());
-                    }
-                })),
-        QUEUED_BYTES(new 
StandardMetricDescriptor<ConnectionStatus>("queuedBytes", "Queued Bytes",
-                "The number of Bytes queued in this Connection", 
Formatter.DATA_SIZE, new ValueMapper<ConnectionStatus>() {
-                    @Override
-                    public Long getValue(final ConnectionStatus status) {
-                        return status.getQueuedBytes();
-                    }
-                })),
-        QUEUED_COUNT(new 
StandardMetricDescriptor<ConnectionStatus>("queuedCount", "Queued Count",
-                "The number of FlowFiles queued in this Connection", 
Formatter.COUNT, new ValueMapper<ConnectionStatus>() {
-                    @Override
-                    public Long getValue(final ConnectionStatus status) {
-                        return Long.valueOf(status.getQueuedCount());
-                    }
-                }));
-
-        private MetricDescriptor<ConnectionStatus> descriptor;
-
-        private ConnectionStatusDescriptor(final 
MetricDescriptor<ConnectionStatus> descriptor) {
-            this.descriptor = descriptor;
-        }
-
-        public String getField() {
-            return descriptor.getField();
-        }
-
-        public MetricDescriptor<ConnectionStatus> getDescriptor() {
-            return descriptor;
-        }
-    }
 
-    public static enum ProcessorStatusDescriptor {
-
-        BYTES_READ(new StandardMetricDescriptor<ProcessorStatus>("bytesRead", 
"Bytes Read (5 mins)",
-                "The total number of bytes read from the Content Repository by 
this Processor in the past 5 minutes", Formatter.DATA_SIZE, new 
ValueMapper<ProcessorStatus>() {
-                    @Override
-                    public Long getValue(final ProcessorStatus status) {
-                        return status.getBytesRead();
-                    }
-                })),
-        BYTES_WRITTEN(new 
StandardMetricDescriptor<ProcessorStatus>("bytesWritten", "Bytes Written (5 
mins)",
-                "The total number of bytes written to the Content Repository 
by this Processor in the past 5 minutes", Formatter.DATA_SIZE, new 
ValueMapper<ProcessorStatus>() {
-                    @Override
-                    public Long getValue(final ProcessorStatus status) {
-                        return status.getBytesWritten();
-                    }
-                })),
-        BYTES_TRANSFERRED(new 
StandardMetricDescriptor<ProcessorStatus>("bytesTransferred", "Bytes 
Transferred (5 mins)",
-                "The total number of bytes read from or written to the Content 
Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, new 
ValueMapper<ProcessorStatus>() {
-                    @Override
-                    public Long getValue(final ProcessorStatus status) {
-                        return status.getBytesRead() + 
status.getBytesWritten();
-                    }
-                })),
-        INPUT_BYTES(new 
StandardMetricDescriptor<ProcessorStatus>("inputBytes", "Bytes In (5 mins)",
-                "The cumulative size of all FlowFiles that this Processor has 
pulled from its queues in the past 5 minutes", Formatter.DATA_SIZE, new 
ValueMapper<ProcessorStatus>() {
-                    @Override
-                    public Long getValue(final ProcessorStatus status) {
-                        return status.getInputBytes();
-                    }
-                })),
-        INPUT_COUNT(new 
StandardMetricDescriptor<ProcessorStatus>("inputCount", "FlowFiles In (5 mins)",
-                "The number of FlowFiles that this Processor has pulled from 
its queues in the past 5 minutes", Formatter.COUNT, new 
ValueMapper<ProcessorStatus>() {
-                    @Override
-                    public Long getValue(final ProcessorStatus status) {
-                        return Long.valueOf(status.getInputCount());
-                    }
-                })),
-        OUTPUT_BYTES(new 
StandardMetricDescriptor<ProcessorStatus>("outputBytes", "Bytes Out (5 mins)",
-                "The cumulative size of all FlowFiles that this Processor has 
transferred to downstream queues in the past 5 minutes", Formatter.DATA_SIZE, 
new ValueMapper<ProcessorStatus>() {
-                    @Override
-                    public Long getValue(final ProcessorStatus status) {
-                        return status.getOutputBytes();
-                    }
-                })),
-        OUTPUT_COUNT(new 
StandardMetricDescriptor<ProcessorStatus>("outputCount", "FlowFiles Out (5 
mins)",
-                "The number of FlowFiles that this Processor has transferred 
to downstream queues in the past 5 minutes", Formatter.COUNT, new 
ValueMapper<ProcessorStatus>() {
-                    @Override
-                    public Long getValue(final ProcessorStatus status) {
-                        return Long.valueOf(status.getOutputCount());
-                    }
-                })),
-        TASK_COUNT(new StandardMetricDescriptor<ProcessorStatus>("taskCount", 
"Tasks (5 mins)", "The number of tasks that this Processor has completed in the 
past 5 minutes",
-                Formatter.COUNT, new ValueMapper<ProcessorStatus>() {
-                    @Override
-                    public Long getValue(final ProcessorStatus status) {
-                        return Long.valueOf(status.getInvocations());
-                    }
-                })),
-        TASK_MILLIS(new 
StandardMetricDescriptor<ProcessorStatus>("taskMillis", "Total Task Duration (5 
mins)",
-                "The total number of thread-milliseconds that the Processor 
has used to complete its tasks in the past 5 minutes", Formatter.DURATION, new 
ValueMapper<ProcessorStatus>() {
-                    @Override
-                    public Long getValue(final ProcessorStatus status) {
-                        return 
TimeUnit.MILLISECONDS.convert(status.getProcessingNanos(), 
TimeUnit.NANOSECONDS);
-                    }
-                })),
-        FLOWFILES_REMOVED(new 
StandardMetricDescriptor<ProcessorStatus>("flowFilesRemoved", "FlowFiles 
Removed (5 mins)",
-                "The total number of FlowFiles removed by this Processor in 
the last 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() {
-                    @Override
-                    public Long getValue(final ProcessorStatus status) {
-                        return Long.valueOf(status.getFlowFilesRemoved());
-                    }
-                })),
-        AVERAGE_LINEAGE_DURATION(new StandardMetricDescriptor<ProcessorStatus>(
-                "averageLineageDuration",
-                "Average Lineage Duration (5 mins)",
-                "The average amount of time that a FlowFile took to process 
(from receipt until this Processor finished processing it) in the past 5 
minutes.",
-                Formatter.DURATION,
-                new ValueMapper<ProcessorStatus>() {
-                    @Override
-                    public Long getValue(final ProcessorStatus status) {
-                        return 
status.getAverageLineageDuration(TimeUnit.MILLISECONDS);
-                    }
-                }, new ValueReducer<StatusSnapshot, Long>() {
-                    @Override
-                    public Long reduce(final List<StatusSnapshot> values) {
-                        long millis = 0L;
-                        int count = 0;
-
-                        for (final StatusSnapshot snapshot : values) {
-                            final long removed = 
snapshot.getStatusMetrics().get(FLOWFILES_REMOVED.getDescriptor()).longValue();
-                            count += removed;
-
-                            count += 
snapshot.getStatusMetrics().get(OUTPUT_COUNT.getDescriptor()).longValue();
-
-                            final long avgMillis = 
snapshot.getStatusMetrics().get(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue();
-                            final long totalMillis = avgMillis * removed;
-                            millis += totalMillis;
-                        }
-
-                        return count == 0 ? 0 : millis / count;
-                    }
-                }
-        )),
-        AVERAGE_TASK_MILLIS(new StandardMetricDescriptor<ProcessorStatus>(
-                "averageTaskMillis",
-                "Average Task Duration",
-                "The average duration it took this Processor to complete a 
task, as averaged over the past 5 minutes",
-                Formatter.DURATION,
-                new ValueMapper<ProcessorStatus>() {
-                    @Override
-                    public Long getValue(final ProcessorStatus status) {
-                        return status.getInvocations() == 0 ? 0 : 
TimeUnit.MILLISECONDS.convert(status.getProcessingNanos(), 
TimeUnit.NANOSECONDS) / status.getInvocations();
-                    }
-                },
-                new ValueReducer<StatusSnapshot, Long>() {
-                    @Override
-                    public Long reduce(final List<StatusSnapshot> values) {
-                        long procMillis = 0L;
-                        int invocations = 0;
-
-                        for (final StatusSnapshot snapshot : values) {
-                            procMillis += 
snapshot.getStatusMetrics().get(TASK_MILLIS.getDescriptor()).longValue();
-                            invocations += 
snapshot.getStatusMetrics().get(TASK_COUNT.getDescriptor()).intValue();
-                        }
-
-                        if (invocations == 0) {
-                            return 0L;
-                        }
-
-                        return procMillis / invocations;
-                    }
-                }
-        ));
-
-        private MetricDescriptor<ProcessorStatus> descriptor;
-
-        private ProcessorStatusDescriptor(final 
MetricDescriptor<ProcessorStatus> descriptor) {
-            this.descriptor = descriptor;
-        }
-
-        public String getField() {
-            return descriptor.getField();
-        }
-
-        public MetricDescriptor<ProcessorStatus> getDescriptor() {
-            return descriptor;
-        }
-    }
 
     @Override
     public List<MetricDescriptor<ConnectionStatus>> 
getConnectionMetricDescriptors() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/ad32cb82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/NodeBulletinProcessingStrategy.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/NodeBulletinProcessingStrategy.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/NodeBulletinProcessingStrategy.java
index d3cfd9e..ad9208e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/NodeBulletinProcessingStrategy.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/NodeBulletinProcessingStrategy.java
@@ -17,50 +17,24 @@
 package org.apache.nifi.events;
 
 import java.util.HashSet;
-import java.util.LinkedHashSet;
 import java.util.Set;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.commons.collections4.queue.CircularFifoQueue;
 import org.apache.nifi.reporting.Bulletin;
 
 /**
  *
  */
 public class NodeBulletinProcessingStrategy implements 
BulletinProcessingStrategy {
-
-    private final Lock lock;
-    private final Set<Bulletin> bulletins;
-
-    public NodeBulletinProcessingStrategy() {
-        lock = new ReentrantLock();
-        bulletins = new LinkedHashSet<>();
-    }
+    private final CircularFifoQueue<Bulletin> ringBuffer = new 
CircularFifoQueue<>(5);
 
     @Override
-    public void update(final Bulletin bulletin) {
-        lock.lock();
-        try {
-            bulletins.add(bulletin);
-        } finally {
-            lock.unlock();
-        }
+    public synchronized void update(final Bulletin bulletin) {
+        ringBuffer.add(bulletin);
     }
 
-    public Set<Bulletin> getBulletins() {
-        final Set<Bulletin> response = new HashSet<>();
-
-        lock.lock();
-        try {
-            // get all the bulletins currently stored
-            response.addAll(bulletins);
-
-            // remove the bulletins
-            bulletins.clear();
-        } finally {
-            lock.unlock();
-        }
-
+    public synchronized Set<Bulletin> getBulletins() {
+        final Set<Bulletin> response = new HashSet<>(ringBuffer);
         return response;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ad32cb82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
index 8aeb34d..7202546 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
@@ -19,7 +19,6 @@ package org.apache.nifi.events;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
@@ -207,25 +206,6 @@ public class VolatileBulletinRepository implements 
BulletinRepository {
         return controllerBulletins;
     }
 
-    /**
-     * Overrides the default bulletin processing strategy. When a custom
-     * bulletin strategy is employed, bulletins will not be persisted in this
-     * repository and will sent to the specified strategy instead.
-     *
-     * @param strategy bulletin strategy
-     */
-    public void overrideDefaultBulletinProcessing(final 
BulletinProcessingStrategy strategy) {
-        Objects.requireNonNull(strategy);
-        this.processingStrategy = strategy;
-    }
-
-    /**
-     * Restores the default bulletin processing strategy.
-     */
-    public void restoreDefaultBulletinProcessing() {
-        this.processingStrategy = new DefaultBulletinProcessingStrategy();
-    }
-
     private List<RingBuffer<Bulletin>> getBulletinBuffers(final Bulletin 
bulletin) {
         final String storageKey = getBulletinStoreKey(bulletin);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/ad32cb82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index bc5245c..a290fec 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -67,7 +67,6 @@ import 
org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO;
 import org.apache.nifi.web.api.dto.status.ClusterProcessorStatusDTO;
 import org.apache.nifi.web.api.dto.status.ClusterRemoteProcessGroupStatusDTO;
 import org.apache.nifi.web.api.dto.status.ClusterStatusDTO;
-import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO;
 import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
 import org.apache.nifi.web.api.dto.status.NodeStatusDTO;
 import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
@@ -1538,11 +1537,6 @@ public interface NiFiServiceFacade {
      */
     ClusterProcessorStatusDTO getClusterProcessorStatus(String processorId);
 
-    /**
-     * @param processorId id
-     * @return the processor status history for each node connected to the 
cluster
-     */
-    ClusterStatusHistoryDTO getClusterProcessorStatusHistory(String 
processorId);
 
     /**
      * Returns a connection's status for each node connected to the cluster.
@@ -1552,17 +1546,6 @@ public interface NiFiServiceFacade {
      */
     ClusterConnectionStatusDTO getClusterConnectionStatus(String connectionId);
 
-    /**
-     * @param connectionId id
-     * @return the connection status history for each node connected to the 
cluster
-     */
-    ClusterStatusHistoryDTO getClusterConnectionStatusHistory(String 
connectionId);
-
-    /**
-     * @param processGroupId id
-     * @return the process group status history for each node connected to the 
cluster
-     */
-    ClusterStatusHistoryDTO getClusterProcessGroupStatusHistory(String 
processGroupId);
 
     /**
      * Returns a process group's status for each node connected to the cluster.
@@ -1572,13 +1555,6 @@ public interface NiFiServiceFacade {
      */
     ClusterProcessGroupStatusDTO getClusterProcessGroupStatus(String 
processorId);
 
-    /**
-     * Returns the remote process group status history for each node connected 
to the cluster.
-     *
-     * @param remoteProcessGroupId a remote process group identifier
-     * @return The cluster status history
-     */
-    ClusterStatusHistoryDTO getClusterRemoteProcessGroupStatusHistory(String 
remoteProcessGroupId);
 
     /**
      * Returns a remote process group's status for each node connected to the 
cluster.

http://git-wip-us.apache.org/repos/asf/nifi/blob/ad32cb82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 945c671..098fd64 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -16,6 +16,25 @@
  */
 package org.apache.nifi.web;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.WebApplicationException;
+
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.action.Action;
@@ -72,7 +91,6 @@ import org.apache.nifi.remote.RootGroupPort;
 import org.apache.nifi.reporting.Bulletin;
 import org.apache.nifi.reporting.BulletinQuery;
 import org.apache.nifi.reporting.BulletinRepository;
-import org.apache.nifi.reporting.ComponentType;
 import org.apache.nifi.user.AccountStatus;
 import org.apache.nifi.user.NiFiUser;
 import org.apache.nifi.user.NiFiUserGroup;
@@ -131,7 +149,6 @@ import 
org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO;
 import org.apache.nifi.web.api.dto.status.ClusterProcessorStatusDTO;
 import org.apache.nifi.web.api.dto.status.ClusterRemoteProcessGroupStatusDTO;
 import org.apache.nifi.web.api.dto.status.ClusterStatusDTO;
-import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO;
 import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
 import org.apache.nifi.web.api.dto.status.NodeConnectionStatusDTO;
 import org.apache.nifi.web.api.dto.status.NodePortStatusDTO;
@@ -160,24 +177,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.security.access.AccessDeniedException;
 
-import javax.ws.rs.WebApplicationException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.TimeZone;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
 /**
  * Implementation of NiFiServiceFacade that performs revision checking.
  */
@@ -2105,78 +2104,13 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
 
     @Override
     public ProcessGroupStatusDTO getProcessGroupStatus(String groupId) {
-        ProcessGroupStatusDTO statusReport;
-        if (properties.isClusterManager()) {
-            final ProcessGroupStatus mergedProcessGroupStatus = 
clusterManager.getProcessGroupStatus(groupId);
-            if (mergedProcessGroupStatus == null) {
-                throw new ResourceNotFoundException(String.format("Unable to 
find status for process group %s.", groupId));
-            }
-            statusReport = 
dtoFactory.createProcessGroupStatusDto(clusterManager.getBulletinRepository(), 
mergedProcessGroupStatus);
-        } else {
-            statusReport = controllerFacade.getProcessGroupStatus(groupId);
-        }
-        return statusReport;
+        return controllerFacade.getProcessGroupStatus(groupId);
     }
 
     @Override
     public ControllerStatusDTO getControllerStatus() {
-        final ControllerStatusDTO controllerStatus;
-
-        if (properties.isClusterManager()) {
-            final Set<Node> connectedNodes = 
clusterManager.getNodes(Node.Status.CONNECTED);
-
-            if (connectedNodes.isEmpty()) {
-                throw new NoConnectedNodesException();
-            }
-
-            int activeThreadCount = 0;
-            long totalFlowFileObjectCount = 0;
-            long totalFlowFileByteCount = 0;
-            for (final Node node : connectedNodes) {
-                final HeartbeatPayload nodeHeartbeatPayload = 
node.getHeartbeatPayload();
-                if (nodeHeartbeatPayload == null) {
-                    continue;
-                }
-
-                activeThreadCount += 
nodeHeartbeatPayload.getActiveThreadCount();
-                totalFlowFileObjectCount += 
nodeHeartbeatPayload.getTotalFlowFileCount();
-                totalFlowFileByteCount += 
nodeHeartbeatPayload.getTotalFlowFileBytes();
-            }
-
-            controllerStatus = new ControllerStatusDTO();
-            controllerStatus.setActiveThreadCount(activeThreadCount);
-            
controllerStatus.setQueued(FormatUtils.formatCount(totalFlowFileObjectCount) + 
" / " + FormatUtils.formatDataSize(totalFlowFileByteCount));
-
-            final int numNodes = clusterManager.getNodeIds().size();
-            controllerStatus.setConnectedNodes(connectedNodes.size() + " / " + 
numNodes);
-
-            // get the bulletins for the controller
-            final BulletinRepository bulletinRepository = 
clusterManager.getBulletinRepository();
-            
controllerStatus.setBulletins(dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForController()));
-
-            // get the controller service bulletins
-            final BulletinQuery controllerServiceQuery = new 
BulletinQuery.Builder().sourceType(ComponentType.CONTROLLER_SERVICE).build();
-            
controllerStatus.setControllerServiceBulletins(dtoFactory.createBulletinDtos(bulletinRepository.findBulletins(controllerServiceQuery)));
-
-            // get the reporting task bulletins
-            final BulletinQuery reportingTaskQuery = new 
BulletinQuery.Builder().sourceType(ComponentType.REPORTING_TASK).build();
-            
controllerStatus.setReportingTaskBulletins(dtoFactory.createBulletinDtos(bulletinRepository.findBulletins(reportingTaskQuery)));
-
-            // get the component counts by extracting them from the roots' 
group status
-            final ProcessGroupStatus status = 
clusterManager.getProcessGroupStatus("root");
-            if (status != null) {
-                final ProcessGroupCounts counts = 
extractProcessGroupCounts(status);
-                controllerStatus.setRunningCount(counts.getRunningCount());
-                controllerStatus.setStoppedCount(counts.getStoppedCount());
-                controllerStatus.setInvalidCount(counts.getInvalidCount());
-                controllerStatus.setDisabledCount(counts.getDisabledCount());
-                
controllerStatus.setActiveRemotePortCount(counts.getActiveRemotePortCount());
-                
controllerStatus.setInactiveRemotePortCount(counts.getInactiveRemotePortCount());
-            }
-        } else {
-            // get the controller status
-            controllerStatus = controllerFacade.getControllerStatus();
-        }
+        // get the controller status
+        final ControllerStatusDTO controllerStatus = 
controllerFacade.getControllerStatus();
 
         // determine if there are any pending user accounts - only include if 
appropriate
         if 
(NiFiUserUtils.getAuthorities().contains(Authority.ROLE_ADMIN.toString())) {
@@ -3371,25 +3305,6 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
         return clusterRemoteProcessGroupStatusDto;
     }
 
-    @Override
-    public ClusterStatusHistoryDTO getClusterProcessorStatusHistory(String 
processorId) {
-        return clusterManager.getProcessorStatusHistory(processorId);
-    }
-
-    @Override
-    public ClusterStatusHistoryDTO getClusterConnectionStatusHistory(String 
connectionId) {
-        return clusterManager.getConnectionStatusHistory(connectionId);
-    }
-
-    @Override
-    public ClusterStatusHistoryDTO getClusterProcessGroupStatusHistory(String 
processGroupId) {
-        return clusterManager.getProcessGroupStatusHistory(processGroupId);
-    }
-
-    @Override
-    public ClusterStatusHistoryDTO 
getClusterRemoteProcessGroupStatusHistory(String remoteProcessGroupId) {
-        return 
clusterManager.getRemoteProcessGroupStatusHistory(remoteProcessGroupId);
-    }
 
     @Override
     public NodeStatusDTO getNodeStatus(String nodeId) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/ad32cb82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java
index 857df56..3086ab4 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.web.api;
 
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -25,6 +27,7 @@ import javax.ws.rs.DefaultValue;
 import javax.ws.rs.FormParam;
 import javax.ws.rs.GET;
 import javax.ws.rs.HEAD;
+import javax.ws.rs.HttpMethod;
 import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
@@ -34,6 +37,7 @@ import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.cluster.manager.impl.WebClusterManager;
 import org.apache.nifi.cluster.node.Node;
 import org.apache.nifi.util.NiFiProperties;
@@ -49,23 +53,22 @@ import org.apache.nifi.web.api.dto.RevisionDTO;
 import org.apache.nifi.web.api.dto.search.NodeSearchResultDTO;
 import org.apache.nifi.web.api.dto.status.ClusterConnectionStatusDTO;
 import org.apache.nifi.web.api.dto.status.ClusterPortStatusDTO;
+import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO;
 import org.apache.nifi.web.api.dto.status.ClusterProcessorStatusDTO;
 import org.apache.nifi.web.api.dto.status.ClusterRemoteProcessGroupStatusDTO;
 import org.apache.nifi.web.api.dto.status.ClusterStatusDTO;
-import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO;
 import org.apache.nifi.web.api.entity.ClusterConnectionStatusEntity;
 import org.apache.nifi.web.api.entity.ClusterEntity;
 import org.apache.nifi.web.api.entity.ClusterPortStatusEntity;
+import org.apache.nifi.web.api.entity.ClusterProcessGroupStatusEntity;
 import org.apache.nifi.web.api.entity.ClusterProcessorStatusEntity;
 import org.apache.nifi.web.api.entity.ClusterRemoteProcessGroupStatusEntity;
 import org.apache.nifi.web.api.entity.ClusterSearchResultsEntity;
 import org.apache.nifi.web.api.entity.ClusterStatusEntity;
-import org.apache.nifi.web.api.entity.ClusterStatusHistoryEntity;
 import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.apache.nifi.web.api.entity.StatusHistoryEntity;
 import org.apache.nifi.web.api.request.ClientIdParameter;
 import org.apache.nifi.web.api.request.LongParameter;
-
-import org.apache.commons.lang3.StringUtils;
 import org.springframework.security.access.prepost.PreAuthorize;
 
 import com.sun.jersey.api.core.ResourceContext;
@@ -75,8 +78,6 @@ import com.wordnik.swagger.annotations.ApiParam;
 import com.wordnik.swagger.annotations.ApiResponse;
 import com.wordnik.swagger.annotations.ApiResponses;
 import com.wordnik.swagger.annotations.Authorization;
-import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO;
-import org.apache.nifi.web.api.entity.ClusterProcessGroupStatusEntity;
 
 /**
  * RESTful endpoint for managing a cluster.
@@ -92,6 +93,7 @@ public class ClusterResource extends ApplicationResource {
     private ResourceContext resourceContext;
     private NiFiServiceFacade serviceFacade;
     private NiFiProperties properties;
+    private WebClusterManager clusterManager;
 
     /**
      * Locates the ClusterConnection sub-resource.
@@ -595,7 +597,7 @@ public class ClusterResource extends ApplicationResource {
     @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
     @ApiOperation(
             value = "Gets processor status history across the cluster",
-            response = ClusterStatusHistoryEntity.class,
+ response = StatusHistoryEntity.class,
             authorizations = {
                 @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
                 @Authorization(value = "DFM", type = "ROLE_DFM"),
@@ -624,19 +626,16 @@ public class ClusterResource extends ApplicationResource {
             @PathParam("id") String id) {
 
         if (properties.isClusterManager()) {
-            final ClusterStatusHistoryDTO dto = 
serviceFacade.getClusterProcessorStatusHistory(id);
-
-            // create the revision
-            RevisionDTO revision = new RevisionDTO();
-            revision.setClientId(clientId.getClientId());
-
-            // create entity
-            final ClusterStatusHistoryEntity entity = new 
ClusterStatusHistoryEntity();
-            entity.setClusterStatusHistory(dto);
-            entity.setRevision(revision);
+            final URI uri;
+            try {
+                final URI originalUri = getAbsolutePath();
+                final String newPath = "/nifi-api/processors/" + id + 
"/status/history";
+                uri = new URI(originalUri.getScheme(), 
originalUri.getAuthority(), newPath, originalUri.getQuery(), 
originalUri.getFragment());
+            } catch (final URISyntaxException use) {
+                throw new RuntimeException(use);
+            }
 
-            // generate the response
-            return generateOkResponse(entity).build();
+            return clusterManager.applyRequest(HttpMethod.GET, uri, 
getRequestParameters(true), getHeaders()).getResponse();
         }
 
         throw new IllegalClusterResourceRequestException("Only a cluster 
manager can process the request.");
@@ -705,67 +704,6 @@ public class ClusterResource extends ApplicationResource {
     }
 
     /**
-     * Gets the connections status history for every node.
-     *
-     * @param clientId Optional client id. If the client id is not specified, 
a new one will be generated. This value (whether specified or generated) is 
included in the response.
-     * @param id The id of the processor
-     * @return A clusterProcessorStatusHistoryEntity
-     */
-    @GET
-    @Consumes(MediaType.WILDCARD)
-    @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON})
-    @Path("/connections/{id}/status/history")
-    @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
-    @ApiOperation(
-            value = "Gets connection status history across the cluster",
-            response = ClusterStatusHistoryEntity.class,
-            authorizations = {
-                @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
-                @Authorization(value = "DFM", type = "ROLE_DFM"),
-                @Authorization(value = "Admin", type = "ROLE_ADMIN")
-            }
-    )
-    @ApiResponses(
-            value = {
-                @ApiResponse(code = 400, message = "NiFi was unable to 
complete the request because it was invalid. The request should not be retried 
without modification."),
-                @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
-                @ApiResponse(code = 403, message = "Client is not authorized 
to make this request."),
-                @ApiResponse(code = 404, message = "The specified resource 
could not be found."),
-                @ApiResponse(code = 409, message = "The request was valid but 
NiFi was not in the appropriate state to process it. Retrying the same request 
later may be successful.")
-            }
-    )
-    public Response getConnectionStatusHistory(
-            @ApiParam(
-                    value = "If the client id is not specified, new one will 
be generated. This value (whether specified or generated) is included in the 
response.",
-                    required = false
-            )
-            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
-            @ApiParam(
-                    value = "The connection id.",
-                    required = true
-            )
-            @PathParam("id") String id) {
-
-        if (properties.isClusterManager()) {
-            final ClusterStatusHistoryDTO dto = 
serviceFacade.getClusterConnectionStatusHistory(id);
-
-            // create the revision
-            RevisionDTO revision = new RevisionDTO();
-            revision.setClientId(clientId.getClientId());
-
-            // create entity
-            final ClusterStatusHistoryEntity entity = new 
ClusterStatusHistoryEntity();
-            entity.setClusterStatusHistory(dto);
-            entity.setRevision(revision);
-
-            // generate the response
-            return generateOkResponse(entity).build();
-        }
-
-        throw new IllegalClusterResourceRequestException("Only a cluster 
manager can process the request.");
-    }
-
-    /**
      * Gets the process group status for every node.
      *
      * @param clientId Optional client id. If the client id is not specified, 
a new one will be generated. This value (whether specified or generated) is 
included in the response.
@@ -827,66 +765,6 @@ public class ClusterResource extends ApplicationResource {
         throw new IllegalClusterResourceRequestException("Only a cluster 
manager can process the request.");
     }
 
-    /**
-     * Gets the process group status history for every node.
-     *
-     * @param clientId Optional client id. If the client id is not specified, 
a new one will be generated. This value (whether specified or generated) is 
included in the response.
-     * @param id The id of the process group
-     * @return A clusterProcessGroupStatusHistoryEntity
-     */
-    @GET
-    @Consumes(MediaType.WILDCARD)
-    @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON})
-    @Path("/process-groups/{id}/status/history")
-    @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
-    @ApiOperation(
-            value = "Gets process group status history across the cluster",
-            response = ClusterStatusHistoryEntity.class,
-            authorizations = {
-                @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
-                @Authorization(value = "DFM", type = "ROLE_DFM"),
-                @Authorization(value = "Admin", type = "ROLE_ADMIN")
-            }
-    )
-    @ApiResponses(
-            value = {
-                @ApiResponse(code = 400, message = "NiFi was unable to 
complete the request because it was invalid. The request should not be retried 
without modification."),
-                @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
-                @ApiResponse(code = 403, message = "Client is not authorized 
to make this request."),
-                @ApiResponse(code = 404, message = "The specified resource 
could not be found."),
-                @ApiResponse(code = 409, message = "The request was valid but 
NiFi was not in the appropriate state to process it. Retrying the same request 
later may be successful.")
-            }
-    )
-    public Response getProcessGroupStatusHistory(
-            @ApiParam(
-                    value = "If the client id is not specified, new one will 
be generated. This value (whether specified or generated) is included in the 
response.",
-                    required = false
-            )
-            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
-            @ApiParam(
-                    value = "The process group id.",
-                    required = true
-            )
-            @PathParam("id") String id) {
-
-        if (properties.isClusterManager()) {
-            final ClusterStatusHistoryDTO dto = 
serviceFacade.getClusterProcessGroupStatusHistory(id);
-
-            // create the revision
-            RevisionDTO revision = new RevisionDTO();
-            revision.setClientId(clientId.getClientId());
-
-            // create entity
-            final ClusterStatusHistoryEntity entity = new 
ClusterStatusHistoryEntity();
-            entity.setClusterStatusHistory(dto);
-            entity.setRevision(revision);
-
-            // generate the response
-            return generateOkResponse(entity).build();
-        }
-
-        throw new IllegalClusterResourceRequestException("Only a cluster 
manager can process the request.");
-    }
 
     /**
      * Gets the remote process group status for every node.
@@ -1074,66 +952,6 @@ public class ClusterResource extends ApplicationResource {
         throw new IllegalClusterResourceRequestException("Only a cluster 
manager can process the request.");
     }
 
-    /**
-     * Gets the remote process group status history for every node.
-     *
-     * @param clientId Optional client id. If the client id is not specified, 
a new one will be generated. This value (whether specified or generated) is 
included in the response.
-     * @param id The id of the processor
-     * @return A clusterRemoteProcessGroupStatusHistoryEntity
-     */
-    @GET
-    @Consumes(MediaType.WILDCARD)
-    @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON})
-    @Path("/remote-process-groups/{id}/status/history")
-    @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
-    @ApiOperation(
-            value = "Gets the remote process group status history across the 
cluster",
-            response = ClusterStatusHistoryEntity.class,
-            authorizations = {
-                @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
-                @Authorization(value = "DFM", type = "ROLE_DFM"),
-                @Authorization(value = "Admin", type = "ROLE_ADMIN")
-            }
-    )
-    @ApiResponses(
-            value = {
-                @ApiResponse(code = 400, message = "NiFi was unable to 
complete the request because it was invalid. The request should not be retried 
without modification."),
-                @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
-                @ApiResponse(code = 403, message = "Client is not authorized 
to make this request."),
-                @ApiResponse(code = 404, message = "The specified resource 
could not be found."),
-                @ApiResponse(code = 409, message = "The request was valid but 
NiFi was not in the appropriate state to process it. Retrying the same request 
later may be successful.")
-            }
-    )
-    public Response getRemoteProcessGroupStatusHistory(
-            @ApiParam(
-                    value = "If the client id is not specified, new one will 
be generated. This value (whether specified or generated) is included in the 
response.",
-                    required = false
-            )
-            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
-            @ApiParam(
-                    value = "The remote process group id.",
-                    required = true
-            )
-            @PathParam("id") String id) {
-
-        if (properties.isClusterManager()) {
-            final ClusterStatusHistoryDTO dto = 
serviceFacade.getClusterRemoteProcessGroupStatusHistory(id);
-
-            // create the revision
-            RevisionDTO revision = new RevisionDTO();
-            revision.setClientId(clientId.getClientId());
-
-            // create entity
-            final ClusterStatusHistoryEntity entity = new 
ClusterStatusHistoryEntity();
-            entity.setClusterStatusHistory(dto);
-            entity.setRevision(revision);
-
-            // generate the response
-            return generateOkResponse(entity).build();
-        }
-
-        throw new IllegalClusterResourceRequestException("Only a cluster 
manager can process the request.");
-    }
 
     // setters
     public void setServiceFacade(NiFiServiceFacade serviceFacade) {
@@ -1144,4 +962,7 @@ public class ClusterResource extends ApplicationResource {
         this.properties = properties;
     }
 
+    public void setClusterManager(WebClusterManager clusterManager) {
+        this.clusterManager = clusterManager;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ad32cb82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
index 738da04..ddc60cd 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
@@ -16,12 +16,41 @@
  */
 package org.apache.nifi.web.api;
 
-import com.wordnik.swagger.annotations.Api;
-import com.wordnik.swagger.annotations.ApiOperation;
-import com.wordnik.swagger.annotations.ApiParam;
-import com.wordnik.swagger.annotations.ApiResponse;
-import com.wordnik.swagger.annotations.ApiResponses;
-import com.wordnik.swagger.annotations.Authorization;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.FormParam;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.StreamingOutput;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.cluster.context.ClusterContext;
 import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
@@ -33,7 +62,6 @@ import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.ConfigurationSnapshot;
 import org.apache.nifi.web.DownloadableContent;
-import org.apache.nifi.web.IllegalClusterResourceRequestException;
 import org.apache.nifi.web.NiFiServiceFacade;
 import org.apache.nifi.web.Revision;
 import org.apache.nifi.web.api.dto.ConnectableDTO;
@@ -55,43 +83,14 @@ import org.apache.nifi.web.api.request.ClientIdParameter;
 import org.apache.nifi.web.api.request.ConnectableTypeParameter;
 import org.apache.nifi.web.api.request.IntegerParameter;
 import org.apache.nifi.web.api.request.LongParameter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.security.access.prepost.PreAuthorize;
 
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.FormParam;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
-import javax.ws.rs.core.StreamingOutput;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiParam;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+import com.wordnik.swagger.annotations.Authorization;
 
 /**
  * RESTful endpoint for managing a Connection.
@@ -99,8 +98,6 @@ import java.util.UUID;
 @Api(hidden = true)
 public class ConnectionResource extends ApplicationResource {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(ConnectionResource.class);
-
     private NiFiServiceFacade serviceFacade;
     private WebClusterManager clusterManager;
     private NiFiProperties properties;
@@ -327,7 +324,7 @@ public class ConnectionResource extends ApplicationResource 
{
 
         // replicate if cluster manager
         if (properties.isClusterManager()) {
-            throw new IllegalClusterResourceRequestException("This request is 
only supported in standalone mode.");
+            return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
         }
 
         // get the specified processor status history
@@ -565,7 +562,7 @@ public class ConnectionResource extends ApplicationResource 
{
             headersToOverride.put("content-type", MediaType.APPLICATION_JSON);
 
             // replicate put request
-            return (Response) clusterManager.applyRequest(HttpMethod.PUT, 
putUri, updateClientId(connectionEntity), 
getHeaders(headersToOverride)).getResponse();
+            return clusterManager.applyRequest(HttpMethod.PUT, putUri, 
updateClientId(connectionEntity), getHeaders(headersToOverride)).getResponse();
         }
 
         // get the connection

http://git-wip-us.apache.org/repos/asf/nifi/blob/ad32cb82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
index ef62a62..36b25f2 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
@@ -524,6 +524,10 @@ public class ControllerResource extends 
ApplicationResource {
             )
             @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId) {
 
+        if (properties.isClusterManager()) {
+            return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+        }
+
         final ControllerStatusDTO controllerStatus = 
serviceFacade.getControllerStatus();
 
         // create the revision

Reply via email to