Repository: nifi
Updated Branches:
  refs/heads/NIFI-1563 7f49f8723 -> a901bc65f


NIFI-1563: Bug fixes; code cleanup; replicate requests to bulletin board 
endpoint


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/deba41d8
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/deba41d8
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/deba41d8

Branch: refs/heads/NIFI-1563
Commit: deba41d8e0ae86208f0c1d5a50a191f14c1b9ab1
Parents: 7f49f87
Author: Mark Payne <[email protected]>
Authored: Mon Mar 7 15:27:21 2016 -0500
Committer: Mark Payne <[email protected]>
Committed: Mon Mar 7 15:27:21 2016 -0500

----------------------------------------------------------------------
 .../cluster/manager/impl/WebClusterManager.java | 115 +++++++++++++------
 .../org/apache/nifi/controller/Heartbeater.java |  22 ----
 .../service/ControllerServiceNode.java          |  13 +--
 .../nifi/connectable/StandardConnection.java    |   9 +-
 .../apache/nifi/controller/FlowController.java  |   7 +-
 .../nifi/controller/StandardFlowFileQueue.java  |   9 +-
 .../scheduling/StandardProcessScheduler.java    |  12 +-
 .../service/StandardControllerServiceNode.java  |  23 ++--
 .../controller/TestStandardFlowFileQueue.java   |   2 +-
 .../repository/TestStandardProcessSession.java  |   2 +-
 .../TestWriteAheadFlowFileRepository.java       |   3 +-
 .../TestStandardProcessScheduler.java           |   8 +-
 .../TestStandardControllerServiceProvider.java  |   4 +-
 .../nifi/web/api/BulletinBoardResource.java     |  36 ++++--
 .../nifi/web/controller/ControllerFacade.java   |   2 +
 .../src/main/resources/nifi-web-api-context.xml |   2 +
 16 files changed, 140 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/deba41d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index af62111..3cd8b05 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -126,7 +126,6 @@ import 
org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.StateManagerProvider;
 import org.apache.nifi.controller.ControllerService;
-import org.apache.nifi.controller.Heartbeater;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.StandardFlowSerializer;
@@ -149,7 +148,6 @@ import 
org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.controller.service.StandardControllerServiceProvider;
 import org.apache.nifi.controller.state.SortedStateUtils;
 import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
-import org.apache.nifi.controller.status.history.ComponentStatusRepository;
 import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
 import org.apache.nifi.controller.status.history.MetricDescriptor;
 import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor;
@@ -176,7 +174,6 @@ import org.apache.nifi.logging.NiFiLog;
 import org.apache.nifi.logging.ReportingTaskLogObserver;
 import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.nar.NarCloseable;
-import org.apache.nifi.nar.NarThreadContextClassLoader;
 import org.apache.nifi.processor.SimpleProcessLogger;
 import org.apache.nifi.processor.StandardValidationContextFactory;
 import org.apache.nifi.remote.RemoteResourceManager;
@@ -202,6 +199,7 @@ import org.apache.nifi.util.ReflectionUtils;
 import org.apache.nifi.web.OptimisticLockingManager;
 import org.apache.nifi.web.Revision;
 import org.apache.nifi.web.UpdateRevision;
+import org.apache.nifi.web.api.dto.BulletinBoardDTO;
 import org.apache.nifi.web.api.dto.BulletinDTO;
 import org.apache.nifi.web.api.dto.ComponentStateDTO;
 import org.apache.nifi.web.api.dto.ControllerServiceDTO;
@@ -231,6 +229,7 @@ import org.apache.nifi.web.api.dto.status.StatusDTO;
 import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
 import org.apache.nifi.web.api.dto.status.StatusMerger;
 import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
+import org.apache.nifi.web.api.entity.BulletinBoardEntity;
 import org.apache.nifi.web.api.entity.ComponentStateEntity;
 import org.apache.nifi.web.api.entity.ControllerServiceEntity;
 import 
org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
@@ -317,7 +316,6 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
      */
     private static final int DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS = 5;
 
-    public static final String DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION = 
"org.apache.nifi.controller.status.history.VolatileComponentStatusRepository";
 
     public static final Pattern PROCESSORS_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors");
     public static final Pattern PROCESSOR_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}");
@@ -345,6 +343,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
     public static final String REPORTING_TASKS_URI = 
"/nifi-api/controller/reporting-tasks/node";
     public static final Pattern REPORTING_TASK_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}");
     public static final Pattern REPORTING_TASK_STATE_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}/state");
+    public static final Pattern BULLETIN_BOARD_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/bulletin-board");
 
     public static final Pattern PROCESSOR_STATUS_HISTORY_URI_PATTERN =
         
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}/status/history");
@@ -468,11 +467,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
             throw new RuntimeException(e);
         }
 
-        processScheduler = new StandardProcessScheduler(new Heartbeater() {
-            @Override
-            public void heartbeat() {
-            }
-        }, this, encryptor, stateManagerProvider);
+        processScheduler = new StandardProcessScheduler(this, encryptor, 
stateManagerProvider);
 
         // When we construct the scheduling agents, we can pass null for a lot 
of the arguments because we are only
         // going to be scheduling Reporting Tasks. Otherwise, it would not be 
okay.
@@ -1870,20 +1865,6 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
     }
 
 
-    private ComponentStatusRepository createComponentStatusRepository() {
-        final String implementationClassName = 
properties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION,
 DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION);
-        if (implementationClassName == null) {
-            throw new RuntimeException("Cannot create Component Status 
Repository because the NiFi Properties is missing the following property: "
-                    + 
NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION);
-        }
-
-        try {
-            return 
NarThreadContextClassLoader.createInstance(implementationClassName, 
ComponentStatusRepository.class);
-        } catch (final Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
     @Override
     public Set<Node> getNodes(final Status... statuses) {
         final Set<Status> desiredStatusSet = new HashSet<>();
@@ -2473,6 +2454,10 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
         return "GET".equalsIgnoreCase(method) && 
CONNECTION_STATUS_HISTORY_URI_PATTERN.matcher(uri.getPath()).matches();
     }
 
+    private static boolean isBulletinBoardEndpoint(final URI uri, final String 
method) {
+        return "GET".equalsIgnoreCase(method) && 
BULLETIN_BOARD_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
 
     private static boolean isRemoteProcessGroupEndpoint(final URI uri, final 
String method) {
         if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) 
&& REMOTE_PROCESS_GROUP_URI_PATTERN.matcher(uri.getPath()).matches()) {
@@ -2577,9 +2562,10 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
                 || isControllerServiceReferenceEndpoint(uri, method) || 
isControllerServiceStateEndpoint(uri, method)
                 || isReportingTasksEndpoint(uri, method) || 
isReportingTaskEndpoint(uri, method) || isReportingTaskStateEndpoint(uri, 
method)
                 || isDropRequestEndpoint(uri, method) || 
isListFlowFilesEndpoint(uri, method)
-            || isGroupStatusEndpoint(uri, method) || 
isControllerStatusEndpoint(uri, method)
-            || isProcessorStatusHistoryEndpoint(uri, method) || 
isProcessGroupStatusHistoryEndpoint(uri, method)
-            || isRemoteProcessGroupStatusHistoryEndpoint(uri, method) || 
isConnectionStatusHistoryEndpoint(uri, method);
+                || isGroupStatusEndpoint(uri, method) || 
isControllerStatusEndpoint(uri, method)
+                || isProcessorStatusHistoryEndpoint(uri, method) || 
isProcessGroupStatusHistoryEndpoint(uri, method)
+                || isRemoteProcessGroupStatusHistoryEndpoint(uri, method) || 
isConnectionStatusHistoryEndpoint(uri, method)
+                || isBulletinBoardEndpoint(uri, method);
     }
 
     private void mergeProcessorValidationErrors(final ProcessorDTO processor, 
Map<NodeIdentifier, ProcessorDTO> processorMap) {
@@ -2750,6 +2736,34 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
         return mergedBulletins;
     }
 
+    private void mergeBulletinBoard(final BulletinBoardDTO nodeBulletinBoard, 
final Map<NodeIdentifier, BulletinBoardDTO> resultMap) {
+        final List<BulletinDTO> bulletinDtos = new ArrayList<>();
+        for (final Map.Entry<NodeIdentifier, BulletinBoardDTO> entry : 
resultMap.entrySet()) {
+            final NodeIdentifier nodeId = entry.getKey();
+            final BulletinBoardDTO boardDto = entry.getValue();
+            final String nodeAddress = nodeId.getApiAddress() + ":" + 
nodeId.getApiPort();
+
+            for (final BulletinDTO bulletin : boardDto.getBulletins()) {
+                bulletin.setNodeAddress(nodeAddress);
+                bulletinDtos.add(bulletin);
+            }
+        }
+
+        Collections.sort(bulletinDtos, new Comparator<BulletinDTO>() {
+            @Override
+            public int compare(final BulletinDTO o1, final BulletinDTO o2) {
+                final int timeComparison = 
o1.getTimestamp().compareTo(o2.getTimestamp());
+                if (timeComparison != 0) {
+                    return timeComparison;
+                }
+
+                return o1.getNodeAddress().compareTo(o2.getNodeAddress());
+            }
+        });
+
+        nodeBulletinBoard.setBulletins(bulletinDtos);
+    }
+
     /**
      * Creates BulletinDTOs for the specified Bulletins.
      *
@@ -3757,6 +3771,24 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
             mergeControllerStatus(statusRequest, resultsMap);
 
             clientResponse = new NodeResponse(clientResponse, responseEntity);
+        } else if (hasSuccessfulClientResponse && isBulletinBoardEndpoint(uri, 
method)) {
+            final BulletinBoardEntity responseEntity = 
clientResponse.getClientResponse().getEntity(BulletinBoardEntity.class);
+            final BulletinBoardDTO responseDto = 
responseEntity.getBulletinBoard();
+
+            final Map<NodeIdentifier, BulletinBoardDTO> resultsMap = new 
HashMap<>();
+            for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+                if (problematicNodeResponses.contains(nodeResponse)) {
+                    continue;
+                }
+
+                final BulletinBoardEntity nodeResponseEntity = nodeResponse == 
clientResponse ? responseEntity : 
nodeResponse.getClientResponse().getEntity(BulletinBoardEntity.class);
+                final BulletinBoardDTO nodeStatus = 
nodeResponseEntity.getBulletinBoard();
+
+                resultsMap.put(nodeResponse.getNodeId(), nodeStatus);
+            }
+            mergeBulletinBoard(responseDto, resultsMap);
+
+            clientResponse = new NodeResponse(clientResponse, responseEntity);
         } else if (hasSuccessfulClientResponse && 
isProcessorStatusHistoryEndpoint(uri, method)) {
             final Map<String, MetricDescriptor<?>> metricDescriptors = new 
HashMap<>();
             for (final ProcessorStatusDescriptor descriptor : 
ProcessorStatusDescriptor.values()) {
@@ -4462,25 +4494,40 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
     }
 
     private List<StatusSnapshotDTO> mergeStatusHistories(final 
List<NodeStatusSnapshotsDTO> nodeStatusSnapshots, final Map<String, 
MetricDescriptor<?>> metricDescriptors) {
-        // Map of "normalized Date" (i.e., a time range, essentially) to all 
Snapshots for that time. The list
-        // will contain one snapshot for each node.
-        final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new 
TreeMap<>();
+        // We want a Map<Date, List<StatusSnapshot>>, which is a Map of 
"normalized Date" (i.e., a time range, essentially)
+        // to all Snapshots for that time. The list will contain one snapshot 
for each node. However, we can have the case
+        // where the NCM has a different value for the 
componentStatusSnapshotMillis than the nodes have. In this case,
+        // we end up with multiple entries in the List<StatusSnapshot> for the 
same node/timestamp, which skews our aggregate
+        // results. In order to avoid this, we will use only the latest 
snapshot for a node that falls into the the time range
+        // of interest.
+        // To accomplish this, we have an intermediate data structure, which 
is a Map of "normalized Date" to an inner Map
+        // of Node Identifier to StatusSnapshot. We then will flatten this Map 
and aggregate the results.
+        final Map<Date, Map<String, StatusSnapshot>> dateToNodeSnapshots = new 
TreeMap<>();
 
         // group status snapshot's for each node by date
         for (final NodeStatusSnapshotsDTO nodeStatusSnapshot : 
nodeStatusSnapshots) {
             for (final StatusSnapshotDTO snapshotDto : 
nodeStatusSnapshot.getStatusSnapshots()) {
                 final StatusSnapshot snapshot = createSnapshot(snapshotDto, 
metricDescriptors);
                 final Date normalizedDate = 
normalizeStatusSnapshotDate(snapshot.getTimestamp(), 
componentStatusSnapshotMillis);
-                List<StatusSnapshot> snapshots = 
snapshotsToAggregate.get(normalizedDate);
-                if (snapshots == null) {
-                    snapshots = new ArrayList<>();
-                    snapshotsToAggregate.put(normalizedDate, snapshots);
+
+                Map<String, StatusSnapshot> nodeToSnapshotMap = 
dateToNodeSnapshots.get(normalizedDate);
+                if (nodeToSnapshotMap == null) {
+                    nodeToSnapshotMap = new HashMap<>();
+                    dateToNodeSnapshots.put(normalizedDate, nodeToSnapshotMap);
                 }
-                snapshots.add(snapshot);
+                nodeToSnapshotMap.put(nodeStatusSnapshot.getNodeId(), 
snapshot);
             }
         }
 
         // aggregate the snapshots by (normalized) timestamp
+        final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new 
TreeMap<>();
+        for (final Map.Entry<Date, Map<String, StatusSnapshot>> entry : 
dateToNodeSnapshots.entrySet()) {
+            final Date normalizedDate = entry.getKey();
+            final Map<String, StatusSnapshot> nodeToSnapshot = 
entry.getValue();
+            final List<StatusSnapshot> snapshotsForTimestamp = new 
ArrayList<>(nodeToSnapshot.values());
+            snapshotsToAggregate.put(normalizedDate, snapshotsForTimestamp);
+        }
+
         final List<StatusSnapshotDTO> aggregatedSnapshots = 
aggregate(snapshotsToAggregate);
         return aggregatedSnapshots;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/deba41d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Heartbeater.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Heartbeater.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Heartbeater.java
deleted file mode 100644
index 1195bc9..0000000
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Heartbeater.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-public interface Heartbeater {
-
-    void heartbeat();
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/deba41d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
index e91ba9a..0d7f3ff 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
@@ -21,7 +21,6 @@ import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.nifi.controller.ConfiguredComponent;
 import org.apache.nifi.controller.ControllerService;
-import org.apache.nifi.controller.Heartbeater;
 
 public interface ControllerServiceNode extends ConfiguredComponent {
 
@@ -64,10 +63,8 @@ public interface ControllerServiceNode extends 
ConfiguredComponent {
      *            initiate service enabling task as well as its re-tries
      * @param administrativeYieldMillis
      *            the amount of milliseconds to wait for administrative yield
-     * @param heartbeater
-     *            the instance of {@link Heartbeater}
      */
-    void enable(ScheduledExecutorService scheduler, long 
administrativeYieldMillis, Heartbeater heartbeater);
+    void enable(ScheduledExecutorService scheduler, long 
administrativeYieldMillis);
 
     /**
      * Will disable this service. Disabling of the service typically means
@@ -76,10 +73,8 @@ public interface ControllerServiceNode extends 
ConfiguredComponent {
      * @param scheduler
      *            implementation of {@link ScheduledExecutorService} used to
      *            initiate service disabling task
-     * @param heartbeater
-     *            the instance of {@link Heartbeater}
      */
-    void disable(ScheduledExecutorService scheduler, Heartbeater heartbeater);
+    void disable(ScheduledExecutorService scheduler);
 
     /**
      * @return the ControllerServiceReference that describes which components 
are referencing this Controller Service
@@ -139,12 +134,12 @@ public interface ControllerServiceNode extends 
ConfiguredComponent {
     /**
      * Returns 'true' if this service is active. The service is considered to 
be
      * active if and only if it's
-     * {@link #enable(ScheduledExecutorService, long, Heartbeater)} operation
+     * {@link #enable(ScheduledExecutorService, long)} operation
      * has been invoked and the service has been transitioned to ENABLING 
state.
      * The service will also remain 'active' after its been transitioned to
      * ENABLED state. <br>
      * The service will be de-activated upon invocation of
-     * {@link #disable(ScheduledExecutorService, Heartbeater)}.
+     * {@link #disable(ScheduledExecutorService)}.
      */
     boolean isActive();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/deba41d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
index 1ef18c0..d43a3db 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
@@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.nifi.controller.Heartbeater;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.StandardFlowFileQueue;
 import org.apache.nifi.controller.queue.FlowFileQueue;
@@ -71,7 +70,7 @@ public final class StandardConnection implements Connection {
         relationships = new 
AtomicReference<>(Collections.unmodifiableCollection(builder.relationships));
         scheduler = builder.scheduler;
         flowFileQueue = new StandardFlowFileQueue(id, this, 
builder.flowFileRepository, builder.provenanceRepository, 
builder.resourceClaimManager,
-            scheduler, builder.swapManager, builder.eventReporter, 
NiFiProperties.getInstance().getQueueSwapThreshold(), builder.heartbeater);
+            scheduler, builder.swapManager, builder.eventReporter, 
NiFiProperties.getInstance().getQueueSwapThreshold());
         hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode();
     }
 
@@ -270,7 +269,6 @@ public final class StandardConnection implements Connection 
{
         private FlowFileRepository flowFileRepository;
         private ProvenanceEventRepository provenanceRepository;
         private ResourceClaimManager resourceClaimManager;
-        private Heartbeater heartbeater;
 
         public Builder(final ProcessScheduler scheduler) {
             this.scheduler = scheduler;
@@ -306,11 +304,6 @@ public final class StandardConnection implements 
Connection {
             return this;
         }
 
-        public Builder heartbeater(final Heartbeater heartbeater) {
-            this.heartbeater = heartbeater;
-            return this;
-        }
-
         public Builder bendPoints(final List<Position> bendPoints) {
             this.bendPoints.clear();
             this.bendPoints.addAll(bendPoints);

http://git-wip-us.apache.org/repos/asf/nifi/blob/deba41d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 7ea0408..95c2984 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -213,7 +213,7 @@ import org.slf4j.LoggerFactory;
 
 import com.sun.jersey.api.client.ClientHandlerException;
 
-public class FlowController implements EventAccess, ControllerServiceProvider, 
ReportingTaskProvider, Heartbeater, QueueProvider {
+public class FlowController implements EventAccess, ControllerServiceProvider, 
ReportingTaskProvider, QueueProvider {
 
     // default repository implementations
     public static final String DEFAULT_FLOWFILE_REPO_IMPLEMENTATION = 
"org.apache.nifi.controller.repository.WriteAheadFlowFileRepository";
@@ -428,7 +428,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             throw new RuntimeException(e);
         }
 
-        processScheduler = new StandardProcessScheduler(this, this, encryptor, 
stateManagerProvider);
+        processScheduler = new StandardProcessScheduler(this, encryptor, 
stateManagerProvider);
         eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, 
processScheduler);
         controllerServiceProvider = new 
StandardControllerServiceProvider(processScheduler, bulletinRepository, 
stateManagerProvider);
 
@@ -824,7 +824,6 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             .resourceClaimManager(resourceClaimManager)
             .flowFileRepository(flowFileRepository)
             .provenanceRepository(provenanceEventRepository)
-            .heartbeater(this)
             .build();
     }
 
@@ -2890,7 +2889,6 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     public Counter resetCounter(final String identifier) {
         final CounterRepository counterRepo = counterRepositoryRef.get();
         final Counter resetValue = counterRepo.resetCounter(identifier);
-        heartbeat();
         return resetValue;
     }
 
@@ -3597,7 +3595,6 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         }
     }
 
-    @Override
     public void heartbeat() {
         if (!isClustered()) {
             return;

http://git-wip-us.apache.org/repos/asf/nifi/blob/deba41d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index 22aacdc..4afd069 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -109,7 +109,6 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
     private final FlowFileRepository flowFileRepository;
     private final ProvenanceEventRepository provRepository;
     private final ResourceClaimManager resourceClaimManager;
-    private final Heartbeater heartbeater;
 
     private final ConcurrentMap<String, DropFlowFileRequest> dropRequestMap = 
new ConcurrentHashMap<>();
     private final ConcurrentMap<String, ListFlowFileRequest> listRequestMap = 
new ConcurrentHashMap<>();
@@ -118,8 +117,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
     private final ProcessScheduler scheduler;
 
     public StandardFlowFileQueue(final String identifier, final Connection 
connection, final FlowFileRepository flowFileRepo, final 
ProvenanceEventRepository provRepo,
-        final ResourceClaimManager resourceClaimManager, final 
ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final 
EventReporter eventReporter, final int swapThreshold,
-        final Heartbeater heartbeater) {
+        final ResourceClaimManager resourceClaimManager, final 
ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final 
EventReporter eventReporter, final int swapThreshold) {
         activeQueue = new PriorityQueue<>(20, new Prioritizer(new 
ArrayList<FlowFilePrioritizer>()));
         priorities = new ArrayList<>();
         swapQueue = new ArrayList<>();
@@ -133,7 +131,6 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
         this.swapThreshold = swapThreshold;
         this.scheduler = scheduler;
         this.connection = connection;
-        this.heartbeater = heartbeater;
 
         readLock = new TimedLock(this.lock.readLock(), identifier + " Read 
Lock", 100);
         writeLock = new TimedLock(this.lock.writeLock(), identifier + " Write 
Lock", 100);
@@ -710,6 +707,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
         }
 
         @Override
+        @SuppressWarnings("deprecation")
         public int compare(final FlowFileRecord f1, final FlowFileRecord f2) {
             int returnVal = 0;
             final boolean f1Penalized = f1.isPenalized();
@@ -1145,9 +1143,6 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
                         logger.info("Successfully dropped {} FlowFiles ({} 
bytes) from Connection with ID {} on behalf of {}",
                             dropRequest.getDroppedSize().getObjectCount(), 
dropRequest.getDroppedSize().getByteCount(), 
StandardFlowFileQueue.this.getIdentifier(), requestor);
                         dropRequest.setState(DropFlowFileState.COMPLETE);
-                        if (heartbeater != null) {
-                            heartbeater.heartbeat();
-                        }
                     } catch (final Exception e) {
                         logger.error("Failed to drop FlowFiles from Connection 
with ID {} due to {}", StandardFlowFileQueue.this.getIdentifier(), 
e.toString());
                         logger.error("", e);

http://git-wip-us.apache.org/repos/asf/nifi/blob/deba41d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index ef27fb5..fe1e850 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -40,7 +40,6 @@ import org.apache.nifi.connectable.Port;
 import org.apache.nifi.controller.AbstractPort;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ControllerService;
-import org.apache.nifi.controller.Heartbeater;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ReportingTaskNode;
@@ -73,7 +72,6 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
     private static final Logger LOG = 
LoggerFactory.getLogger(StandardProcessScheduler.class);
 
     private final ControllerServiceProvider controllerServiceProvider;
-    private final Heartbeater heartbeater;
     private final long administrativeYieldMillis;
     private final String administrativeYieldDuration;
     private final StateManagerProvider stateManagerProvider;
@@ -87,9 +85,8 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
 
     private final StringEncryptor encryptor;
 
-    public StandardProcessScheduler(final Heartbeater heartbeater, final 
ControllerServiceProvider controllerServiceProvider, final StringEncryptor 
encryptor,
+    public StandardProcessScheduler(final ControllerServiceProvider 
controllerServiceProvider, final StringEncryptor encryptor,
         final StateManagerProvider stateManagerProvider) {
-        this.heartbeater = heartbeater;
         this.controllerServiceProvider = controllerServiceProvider;
         this.encryptor = encryptor;
         this.stateManagerProvider = stateManagerProvider;
@@ -367,7 +364,6 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
 
                                 
getSchedulingAgent(procNode).schedule(procNode, scheduleState);
 
-                                heartbeater.heartbeat();
                                 return;
                             }
                         } catch (final Exception e) {
@@ -446,7 +442,6 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
                     // If no threads currently running, call the OnStopped 
methods
                     if (state.getActiveThreadCount() == 0 && 
state.mustCallOnStoppedMethods()) {
                         
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, 
procNode.getProcessor(), processContext);
-                        heartbeater.heartbeat();
                     }
                 }
             }
@@ -525,7 +520,6 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
             final ConnectableProcessContext processContext = new 
ConnectableProcessContext(connectable, encryptor, 
getStateManager(connectable.getIdentifier()));
             try (final NarCloseable x = NarCloseable.withNarLoader()) {
                 
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, 
connectable, processContext);
-                heartbeater.heartbeat();
             }
         }
     }
@@ -636,12 +630,12 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
 
     @Override
     public void enableControllerService(final ControllerServiceNode service) {
-        service.enable(this.componentLifeCycleThreadPool, 
this.administrativeYieldMillis, this.heartbeater);
+        service.enable(this.componentLifeCycleThreadPool, 
this.administrativeYieldMillis);
     }
 
     @Override
     public void disableControllerService(final ControllerServiceNode service) {
-        service.disable(this.componentLifeCycleThreadPool, this.heartbeater);
+        service.disable(this.componentLifeCycleThreadPool);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/deba41d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index 3f24ff1..bed6a35 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -36,7 +36,6 @@ import org.apache.nifi.controller.AbstractConfiguredComponent;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ConfiguredComponent;
 import org.apache.nifi.controller.ControllerService;
-import org.apache.nifi.controller.Heartbeater;
 import org.apache.nifi.controller.ValidationContextFactory;
 import org.apache.nifi.controller.annotation.OnConfigured;
 import org.apache.nifi.controller.exception.ComponentLifeCycleException;
@@ -273,8 +272,7 @@ public class StandardControllerServiceNode extends 
AbstractConfiguredComponent i
      * as it reached ENABLED state.
      */
     @Override
-    public void enable(final ScheduledExecutorService scheduler, final long 
administrativeYieldMillis,
-            final Heartbeater heartbeater) {
+    public void enable(final ScheduledExecutorService scheduler, final long 
administrativeYieldMillis) {
         if (this.stateRef.compareAndSet(ControllerServiceState.DISABLED, 
ControllerServiceState.ENABLING)) {
             this.active.set(true);
             final ConfigurationContext configContext = new 
StandardConfigurationContext(this, this.serviceProvider, null);
@@ -287,13 +285,11 @@ public class StandardControllerServiceNode extends 
AbstractConfiguredComponent i
                         synchronized (active) {
                             shouldEnable = active.get() && 
stateRef.compareAndSet(ControllerServiceState.ENABLING, 
ControllerServiceState.ENABLED);
                         }
-                        if (shouldEnable) {
-                            heartbeater.heartbeat();
-                        } else {
+                        if (!shouldEnable) {
                             LOG.debug("Disabling service " + this + " after it 
has been enabled due to disable action being initiated.");
                             // Can only happen if user initiated DISABLE 
operation before service finished enabling. It's state will be
                             // set to DISABLING (see disable() operation)
-                            invokeDisable(configContext, heartbeater);
+                            invokeDisable(configContext);
                             stateRef.set(ControllerServiceState.DISABLED);
                         }
                     } catch (Exception e) {
@@ -301,7 +297,7 @@ public class StandardControllerServiceNode extends 
AbstractConfiguredComponent i
                         final ComponentLog componentLog = new 
SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
                         componentLog.error("Failed to invoke @OnEnabled method 
due to {}", cause);
                         LOG.error("Failed to invoke @OnEnabled method of {} 
due to {}", getControllerServiceImplementation(), cause.toString());
-                        invokeDisable(configContext, heartbeater);
+                        invokeDisable(configContext);
 
                         if (isActive()) {
                             scheduler.schedule(this, 
administrativeYieldMillis, TimeUnit.MILLISECONDS);
@@ -323,14 +319,14 @@ public class StandardControllerServiceNode extends 
AbstractConfiguredComponent i
      * If such transition doesn't succeed (the service is still in ENABLING 
state)
      * then the service will still be transitioned to DISABLING state to 
ensure that
      * no other transition could happen on this service. However in such event
-     * (e.g., its @OnEnabled finally succeeded), the {@link 
#enable(ScheduledExecutorService, long, Heartbeater)}
-     * operation will initiate service disabling javadoc for (see {@link 
#enable(ScheduledExecutorService, long, Heartbeater)}
+     * (e.g., its @OnEnabled finally succeeded), the {@link 
#enable(ScheduledExecutorService, long)}
+     * operation will initiate service disabling javadoc for (see {@link 
#enable(ScheduledExecutorService, long)}
      * <br>
      * Upon successful invocation of @OnDisabled this service will be 
transitioned to
      * DISABLED state.
      */
     @Override
-    public void disable(ScheduledExecutorService scheduler, final Heartbeater 
heartbeater) {
+    public void disable(ScheduledExecutorService scheduler) {
         /*
          * The reason for synchronization is to ensure consistency of the
          * service state when another thread is in the middle of enabling this
@@ -347,10 +343,9 @@ public class StandardControllerServiceNode extends 
AbstractConfiguredComponent i
                 @Override
                 public void run() {
                     try {
-                        invokeDisable(configContext, heartbeater);
+                        invokeDisable(configContext);
                     } finally {
                         stateRef.set(ControllerServiceState.DISABLED);
-                        heartbeater.heartbeat();
                     }
                 }
             });
@@ -362,7 +357,7 @@ public class StandardControllerServiceNode extends 
AbstractConfiguredComponent i
     /**
      *
      */
-    private void invokeDisable(ConfigurationContext configContext, Heartbeater 
heartbeater) {
+    private void invokeDisable(ConfigurationContext configContext) {
         try {
             ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, 
StandardControllerServiceNode.this.getControllerServiceImplementation(), 
configContext);
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/deba41d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
index 412e376..d73328f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
@@ -105,7 +105,7 @@ public class TestStandardFlowFileQueue {
             }
         }).when(provRepo).registerEvents(Mockito.any(Iterable.class));
 
-        queue = new StandardFlowFileQueue("id", connection, flowFileRepo, 
provRepo, claimManager, scheduler, swapManager, null, 10000, null);
+        queue = new StandardFlowFileQueue("id", connection, flowFileRepo, 
provRepo, claimManager, scheduler, swapManager, null, 10000);
         TestFlowFile.idGenerator.set(0L);
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/deba41d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index f8db35e..644018f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -139,7 +139,7 @@ public class TestStandardProcessSession {
         final ProcessScheduler processScheduler = 
Mockito.mock(ProcessScheduler.class);
 
         final FlowFileSwapManager swapManager = 
Mockito.mock(FlowFileSwapManager.class);
-        flowFileQueue = new StandardFlowFileQueue("1", connection, 
flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000, 
null);
+        flowFileQueue = new StandardFlowFileQueue("1", connection, 
flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000);
         when(connection.getFlowFileQueue()).thenReturn(flowFileQueue);
 
         Mockito.doAnswer(new Answer<Object>() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/deba41d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index c1c7b45..8b079bf 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -81,7 +81,7 @@ public class TestWriteAheadFlowFileRepository {
         
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
 
         final FlowFileSwapManager swapMgr = new MockFlowFileSwapManager();
-        final FlowFileQueue queue = new StandardFlowFileQueue("1234", 
connection, null, null, claimManager, null, swapMgr, null, 10000, null);
+        final FlowFileQueue queue = new StandardFlowFileQueue("1234", 
connection, null, null, claimManager, null, swapMgr, null, 10000);
 
         when(connection.getFlowFileQueue()).thenReturn(queue);
         queueProvider.addConnection(connection);
@@ -156,6 +156,7 @@ public class TestWriteAheadFlowFileRepository {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void testRestartWithOneRecord() throws IOException {
         final Path path = Paths.get("target/test-repo");
         if (Files.exists(path)) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/deba41d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
index f507eea..ed1e7a9 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
@@ -19,7 +19,6 @@ package org.apache.nifi.controller.scheduling;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
 
 import java.lang.reflect.Field;
 import java.util.ArrayList;
@@ -36,10 +35,9 @@ import org.apache.nifi.annotation.lifecycle.OnDisabled;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.StateManagerProvider;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.components.state.StateManagerProvider;
-import org.apache.nifi.controller.Heartbeater;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ReportingTaskNode;
@@ -79,7 +77,7 @@ public class TestStandardProcessScheduler {
     public void setup() throws InitializationException {
         System.setProperty("nifi.properties.file.path", 
"src/test/resources/nifi.properties");
         this.refreshNiFiProperties();
-        scheduler = new 
StandardProcessScheduler(Mockito.mock(Heartbeater.class), 
Mockito.mock(ControllerServiceProvider.class), null, stateMgrProvider);
+        scheduler = new 
StandardProcessScheduler(Mockito.mock(ControllerServiceProvider.class), null, 
stateMgrProvider);
         scheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, 
Mockito.mock(SchedulingAgent.class));
 
         reportingTask = new TestReportingTask();
@@ -507,6 +505,6 @@ public class TestStandardProcessScheduler {
     }
 
     private ProcessScheduler createScheduler() {
-        return new StandardProcessScheduler(mock(Heartbeater.class), null, 
null, stateMgrProvider);
+        return new StandardProcessScheduler(null, null, stateMgrProvider);
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/deba41d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
index 0dcacb5..cb147b3 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
@@ -30,7 +30,6 @@ import java.util.UUID;
 
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.components.state.StateManagerProvider;
-import org.apache.nifi.controller.Heartbeater;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ScheduledState;
@@ -80,8 +79,7 @@ public class TestStandardControllerServiceProvider {
     }
 
     private ProcessScheduler createScheduler() {
-        final Heartbeater heartbeater = Mockito.mock(Heartbeater.class);
-        return new StandardProcessScheduler(heartbeater, null, null, 
stateManagerProvider);
+        return new StandardProcessScheduler(null, null, stateManagerProvider);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/nifi/blob/deba41d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/BulletinBoardResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/BulletinBoardResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/BulletinBoardResource.java
index 6197953..d13b5c9 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/BulletinBoardResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/BulletinBoardResource.java
@@ -16,20 +16,19 @@
  */
 package org.apache.nifi.web.api;
 
-import com.wordnik.swagger.annotations.Api;
-import com.wordnik.swagger.annotations.ApiOperation;
-import com.wordnik.swagger.annotations.ApiParam;
-import com.wordnik.swagger.annotations.ApiResponse;
-import com.wordnik.swagger.annotations.ApiResponses;
-import com.wordnik.swagger.annotations.Authorization;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.NiFiServiceFacade;
 import org.apache.nifi.web.api.dto.BulletinBoardDTO;
 import org.apache.nifi.web.api.dto.BulletinQueryDTO;
@@ -39,18 +38,23 @@ import 
org.apache.nifi.web.api.request.BulletinBoardPatternParameter;
 import org.apache.nifi.web.api.request.ClientIdParameter;
 import org.apache.nifi.web.api.request.IntegerParameter;
 import org.apache.nifi.web.api.request.LongParameter;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.security.access.prepost.PreAuthorize;
 
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiParam;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+import com.wordnik.swagger.annotations.Authorization;
+
 /**
  * RESTful endpoint for managing a Template.
  */
 @Api(hidden = true)
 public class BulletinBoardResource extends ApplicationResource {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(BulletinBoardResource.class);
+    private NiFiProperties properties;
+    private WebClusterManager clusterManager;
 
     private NiFiServiceFacade serviceFacade;
 
@@ -128,6 +132,11 @@ public class BulletinBoardResource extends 
ApplicationResource {
             )
             @QueryParam("limit") IntegerParameter limit) {
 
+        // replicate if cluster manager
+        if (properties.isClusterManager()) {
+            return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+        }
+
         // build the bulletin query
         final BulletinQueryDTO query = new BulletinQueryDTO();
 
@@ -171,4 +180,11 @@ public class BulletinBoardResource extends 
ApplicationResource {
         this.serviceFacade = serviceFacade;
     }
 
+    public void setClusterManager(WebClusterManager clusterManager) {
+        this.clusterManager = clusterManager;
+    }
+
+    public void setProperties(NiFiProperties properties) {
+        this.properties = properties;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/deba41d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index a349f2a..14d5816 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -439,6 +439,8 @@ public class ControllerFacade {
         final ControllerStatusDTO controllerStatus = new ControllerStatusDTO();
         
controllerStatus.setActiveThreadCount(flowController.getActiveThreadCount());
         
controllerStatus.setQueued(FormatUtils.formatCount(controllerQueueSize.getObjectCount())
 + " / " + FormatUtils.formatDataSize(controllerQueueSize.getByteCount()));
+        controllerStatus.setBytesQueued(controllerQueueSize.getByteCount());
+        
controllerStatus.setFlowFilesQueued(controllerQueueSize.getObjectCount());
 
         final BulletinRepository bulletinRepository = getBulletinRepository();
         
controllerStatus.setBulletins(dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForController()));

http://git-wip-us.apache.org/repos/asf/nifi/blob/deba41d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
index 1a57e22..8e46072 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
@@ -163,6 +163,8 @@
     </bean>
     <bean id="bulletinBoardResource" 
class="org.apache.nifi.web.api.BulletinBoardResource" scope="singleton">
         <property name="serviceFacade" ref="serviceFacade"/>
+        <property name="properties" ref="nifiProperties"/>
+        <property name="clusterManager" ref="clusterManager"/>
     </bean>
     <bean id="templateResource" 
class="org.apache.nifi.web.api.TemplateResource" scope="singleton">
         <property name="serviceFacade" ref="serviceFacade"/>

Reply via email to