Repository: nifi Updated Branches: refs/heads/NIFI-1563 7f49f8723 -> a901bc65f
NIFI-1563: Bug fixes; code cleanup; replicate requests to bulletin board endpoint Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/deba41d8 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/deba41d8 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/deba41d8 Branch: refs/heads/NIFI-1563 Commit: deba41d8e0ae86208f0c1d5a50a191f14c1b9ab1 Parents: 7f49f87 Author: Mark Payne <[email protected]> Authored: Mon Mar 7 15:27:21 2016 -0500 Committer: Mark Payne <[email protected]> Committed: Mon Mar 7 15:27:21 2016 -0500 ---------------------------------------------------------------------- .../cluster/manager/impl/WebClusterManager.java | 115 +++++++++++++------ .../org/apache/nifi/controller/Heartbeater.java | 22 ---- .../service/ControllerServiceNode.java | 13 +-- .../nifi/connectable/StandardConnection.java | 9 +- .../apache/nifi/controller/FlowController.java | 7 +- .../nifi/controller/StandardFlowFileQueue.java | 9 +- .../scheduling/StandardProcessScheduler.java | 12 +- .../service/StandardControllerServiceNode.java | 23 ++-- .../controller/TestStandardFlowFileQueue.java | 2 +- .../repository/TestStandardProcessSession.java | 2 +- .../TestWriteAheadFlowFileRepository.java | 3 +- .../TestStandardProcessScheduler.java | 8 +- .../TestStandardControllerServiceProvider.java | 4 +- .../nifi/web/api/BulletinBoardResource.java | 36 ++++-- .../nifi/web/controller/ControllerFacade.java | 2 + .../src/main/resources/nifi-web-api-context.xml | 2 + 16 files changed, 140 insertions(+), 129 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/deba41d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index af62111..3cd8b05 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -126,7 +126,6 @@ import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.controller.ControllerService; -import org.apache.nifi.controller.Heartbeater; import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.StandardFlowSerializer; @@ -149,7 +148,6 @@ import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.controller.service.StandardControllerServiceProvider; import org.apache.nifi.controller.state.SortedStateUtils; import org.apache.nifi.controller.state.manager.StandardStateManagerProvider; -import org.apache.nifi.controller.status.history.ComponentStatusRepository; import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor; import org.apache.nifi.controller.status.history.MetricDescriptor; import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor; @@ -176,7 +174,6 @@ import org.apache.nifi.logging.NiFiLog; import org.apache.nifi.logging.ReportingTaskLogObserver; import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.NarCloseable; -import org.apache.nifi.nar.NarThreadContextClassLoader; import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.processor.StandardValidationContextFactory; import org.apache.nifi.remote.RemoteResourceManager; @@ -202,6 +199,7 @@ import org.apache.nifi.util.ReflectionUtils; import org.apache.nifi.web.OptimisticLockingManager; import org.apache.nifi.web.Revision; import org.apache.nifi.web.UpdateRevision; +import org.apache.nifi.web.api.dto.BulletinBoardDTO; import org.apache.nifi.web.api.dto.BulletinDTO; import org.apache.nifi.web.api.dto.ComponentStateDTO; import org.apache.nifi.web.api.dto.ControllerServiceDTO; @@ -231,6 +229,7 @@ import org.apache.nifi.web.api.dto.status.StatusDTO; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; import org.apache.nifi.web.api.dto.status.StatusMerger; import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO; +import org.apache.nifi.web.api.entity.BulletinBoardEntity; import org.apache.nifi.web.api.entity.ComponentStateEntity; import org.apache.nifi.web.api.entity.ControllerServiceEntity; import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity; @@ -317,7 +316,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C */ private static final int DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS = 5; - public static final String DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION = "org.apache.nifi.controller.status.history.VolatileComponentStatusRepository"; public static final Pattern PROCESSORS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors"); public static final Pattern PROCESSOR_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}"); @@ -345,6 +343,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks/node"; public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}"); public static final Pattern REPORTING_TASK_STATE_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}/state"); + public static final Pattern BULLETIN_BOARD_URI_PATTERN = Pattern.compile("/nifi-api/controller/bulletin-board"); public static final Pattern PROCESSOR_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}/status/history"); @@ -468,11 +467,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C throw new RuntimeException(e); } - processScheduler = new StandardProcessScheduler(new Heartbeater() { - @Override - public void heartbeat() { - } - }, this, encryptor, stateManagerProvider); + processScheduler = new StandardProcessScheduler(this, encryptor, stateManagerProvider); // When we construct the scheduling agents, we can pass null for a lot of the arguments because we are only // going to be scheduling Reporting Tasks. Otherwise, it would not be okay. @@ -1870,20 +1865,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } - private ComponentStatusRepository createComponentStatusRepository() { - final String implementationClassName = properties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION, DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION); - if (implementationClassName == null) { - throw new RuntimeException("Cannot create Component Status Repository because the NiFi Properties is missing the following property: " - + NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION); - } - - try { - return NarThreadContextClassLoader.createInstance(implementationClassName, ComponentStatusRepository.class); - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - @Override public Set<Node> getNodes(final Status... statuses) { final Set<Status> desiredStatusSet = new HashSet<>(); @@ -2473,6 +2454,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return "GET".equalsIgnoreCase(method) && CONNECTION_STATUS_HISTORY_URI_PATTERN.matcher(uri.getPath()).matches(); } + private static boolean isBulletinBoardEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && BULLETIN_BOARD_URI_PATTERN.matcher(uri.getPath()).matches(); + } + private static boolean isRemoteProcessGroupEndpoint(final URI uri, final String method) { if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && REMOTE_PROCESS_GROUP_URI_PATTERN.matcher(uri.getPath()).matches()) { @@ -2577,9 +2562,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C || isControllerServiceReferenceEndpoint(uri, method) || isControllerServiceStateEndpoint(uri, method) || isReportingTasksEndpoint(uri, method) || isReportingTaskEndpoint(uri, method) || isReportingTaskStateEndpoint(uri, method) || isDropRequestEndpoint(uri, method) || isListFlowFilesEndpoint(uri, method) - || isGroupStatusEndpoint(uri, method) || isControllerStatusEndpoint(uri, method) - || isProcessorStatusHistoryEndpoint(uri, method) || isProcessGroupStatusHistoryEndpoint(uri, method) - || isRemoteProcessGroupStatusHistoryEndpoint(uri, method) || isConnectionStatusHistoryEndpoint(uri, method); + || isGroupStatusEndpoint(uri, method) || isControllerStatusEndpoint(uri, method) + || isProcessorStatusHistoryEndpoint(uri, method) || isProcessGroupStatusHistoryEndpoint(uri, method) + || isRemoteProcessGroupStatusHistoryEndpoint(uri, method) || isConnectionStatusHistoryEndpoint(uri, method) + || isBulletinBoardEndpoint(uri, method); } private void mergeProcessorValidationErrors(final ProcessorDTO processor, Map<NodeIdentifier, ProcessorDTO> processorMap) { @@ -2750,6 +2736,34 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return mergedBulletins; } + private void mergeBulletinBoard(final BulletinBoardDTO nodeBulletinBoard, final Map<NodeIdentifier, BulletinBoardDTO> resultMap) { + final List<BulletinDTO> bulletinDtos = new ArrayList<>(); + for (final Map.Entry<NodeIdentifier, BulletinBoardDTO> entry : resultMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final BulletinBoardDTO boardDto = entry.getValue(); + final String nodeAddress = nodeId.getApiAddress() + ":" + nodeId.getApiPort(); + + for (final BulletinDTO bulletin : boardDto.getBulletins()) { + bulletin.setNodeAddress(nodeAddress); + bulletinDtos.add(bulletin); + } + } + + Collections.sort(bulletinDtos, new Comparator<BulletinDTO>() { + @Override + public int compare(final BulletinDTO o1, final BulletinDTO o2) { + final int timeComparison = o1.getTimestamp().compareTo(o2.getTimestamp()); + if (timeComparison != 0) { + return timeComparison; + } + + return o1.getNodeAddress().compareTo(o2.getNodeAddress()); + } + }); + + nodeBulletinBoard.setBulletins(bulletinDtos); + } + /** * Creates BulletinDTOs for the specified Bulletins. * @@ -3757,6 +3771,24 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C mergeControllerStatus(statusRequest, resultsMap); clientResponse = new NodeResponse(clientResponse, responseEntity); + } else if (hasSuccessfulClientResponse && isBulletinBoardEndpoint(uri, method)) { + final BulletinBoardEntity responseEntity = clientResponse.getClientResponse().getEntity(BulletinBoardEntity.class); + final BulletinBoardDTO responseDto = responseEntity.getBulletinBoard(); + + final Map<NodeIdentifier, BulletinBoardDTO> resultsMap = new HashMap<>(); + for (final NodeResponse nodeResponse : updatedNodesMap.values()) { + if (problematicNodeResponses.contains(nodeResponse)) { + continue; + } + + final BulletinBoardEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(BulletinBoardEntity.class); + final BulletinBoardDTO nodeStatus = nodeResponseEntity.getBulletinBoard(); + + resultsMap.put(nodeResponse.getNodeId(), nodeStatus); + } + mergeBulletinBoard(responseDto, resultsMap); + + clientResponse = new NodeResponse(clientResponse, responseEntity); } else if (hasSuccessfulClientResponse && isProcessorStatusHistoryEndpoint(uri, method)) { final Map<String, MetricDescriptor<?>> metricDescriptors = new HashMap<>(); for (final ProcessorStatusDescriptor descriptor : ProcessorStatusDescriptor.values()) { @@ -4462,25 +4494,40 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } private List<StatusSnapshotDTO> mergeStatusHistories(final List<NodeStatusSnapshotsDTO> nodeStatusSnapshots, final Map<String, MetricDescriptor<?>> metricDescriptors) { - // Map of "normalized Date" (i.e., a time range, essentially) to all Snapshots for that time. The list - // will contain one snapshot for each node. - final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>(); + // We want a Map<Date, List<StatusSnapshot>>, which is a Map of "normalized Date" (i.e., a time range, essentially) + // to all Snapshots for that time. The list will contain one snapshot for each node. However, we can have the case + // where the NCM has a different value for the componentStatusSnapshotMillis than the nodes have. In this case, + // we end up with multiple entries in the List<StatusSnapshot> for the same node/timestamp, which skews our aggregate + // results. In order to avoid this, we will use only the latest snapshot for a node that falls into the the time range + // of interest. + // To accomplish this, we have an intermediate data structure, which is a Map of "normalized Date" to an inner Map + // of Node Identifier to StatusSnapshot. We then will flatten this Map and aggregate the results. + final Map<Date, Map<String, StatusSnapshot>> dateToNodeSnapshots = new TreeMap<>(); // group status snapshot's for each node by date for (final NodeStatusSnapshotsDTO nodeStatusSnapshot : nodeStatusSnapshots) { for (final StatusSnapshotDTO snapshotDto : nodeStatusSnapshot.getStatusSnapshots()) { final StatusSnapshot snapshot = createSnapshot(snapshotDto, metricDescriptors); final Date normalizedDate = normalizeStatusSnapshotDate(snapshot.getTimestamp(), componentStatusSnapshotMillis); - List<StatusSnapshot> snapshots = snapshotsToAggregate.get(normalizedDate); - if (snapshots == null) { - snapshots = new ArrayList<>(); - snapshotsToAggregate.put(normalizedDate, snapshots); + + Map<String, StatusSnapshot> nodeToSnapshotMap = dateToNodeSnapshots.get(normalizedDate); + if (nodeToSnapshotMap == null) { + nodeToSnapshotMap = new HashMap<>(); + dateToNodeSnapshots.put(normalizedDate, nodeToSnapshotMap); } - snapshots.add(snapshot); + nodeToSnapshotMap.put(nodeStatusSnapshot.getNodeId(), snapshot); } } // aggregate the snapshots by (normalized) timestamp + final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>(); + for (final Map.Entry<Date, Map<String, StatusSnapshot>> entry : dateToNodeSnapshots.entrySet()) { + final Date normalizedDate = entry.getKey(); + final Map<String, StatusSnapshot> nodeToSnapshot = entry.getValue(); + final List<StatusSnapshot> snapshotsForTimestamp = new ArrayList<>(nodeToSnapshot.values()); + snapshotsToAggregate.put(normalizedDate, snapshotsForTimestamp); + } + final List<StatusSnapshotDTO> aggregatedSnapshots = aggregate(snapshotsToAggregate); return aggregatedSnapshots; } http://git-wip-us.apache.org/repos/asf/nifi/blob/deba41d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Heartbeater.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Heartbeater.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Heartbeater.java deleted file mode 100644 index 1195bc9..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Heartbeater.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -public interface Heartbeater { - - void heartbeat(); -} http://git-wip-us.apache.org/repos/asf/nifi/blob/deba41d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java index e91ba9a..0d7f3ff 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java @@ -21,7 +21,6 @@ import java.util.concurrent.ScheduledExecutorService; import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ControllerService; -import org.apache.nifi.controller.Heartbeater; public interface ControllerServiceNode extends ConfiguredComponent { @@ -64,10 +63,8 @@ public interface ControllerServiceNode extends ConfiguredComponent { * initiate service enabling task as well as its re-tries * @param administrativeYieldMillis * the amount of milliseconds to wait for administrative yield - * @param heartbeater - * the instance of {@link Heartbeater} */ - void enable(ScheduledExecutorService scheduler, long administrativeYieldMillis, Heartbeater heartbeater); + void enable(ScheduledExecutorService scheduler, long administrativeYieldMillis); /** * Will disable this service. Disabling of the service typically means @@ -76,10 +73,8 @@ public interface ControllerServiceNode extends ConfiguredComponent { * @param scheduler * implementation of {@link ScheduledExecutorService} used to * initiate service disabling task - * @param heartbeater - * the instance of {@link Heartbeater} */ - void disable(ScheduledExecutorService scheduler, Heartbeater heartbeater); + void disable(ScheduledExecutorService scheduler); /** * @return the ControllerServiceReference that describes which components are referencing this Controller Service @@ -139,12 +134,12 @@ public interface ControllerServiceNode extends ConfiguredComponent { /** * Returns 'true' if this service is active. The service is considered to be * active if and only if it's - * {@link #enable(ScheduledExecutorService, long, Heartbeater)} operation + * {@link #enable(ScheduledExecutorService, long)} operation * has been invoked and the service has been transitioned to ENABLING state. * The service will also remain 'active' after its been transitioned to * ENABLED state. <br> * The service will be de-activated upon invocation of - * {@link #disable(ScheduledExecutorService, Heartbeater)}. + * {@link #disable(ScheduledExecutorService)}. */ boolean isActive(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/deba41d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java index 1ef18c0..d43a3db 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java @@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.nifi.controller.Heartbeater; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.StandardFlowFileQueue; import org.apache.nifi.controller.queue.FlowFileQueue; @@ -71,7 +70,7 @@ public final class StandardConnection implements Connection { relationships = new AtomicReference<>(Collections.unmodifiableCollection(builder.relationships)); scheduler = builder.scheduler; flowFileQueue = new StandardFlowFileQueue(id, this, builder.flowFileRepository, builder.provenanceRepository, builder.resourceClaimManager, - scheduler, builder.swapManager, builder.eventReporter, NiFiProperties.getInstance().getQueueSwapThreshold(), builder.heartbeater); + scheduler, builder.swapManager, builder.eventReporter, NiFiProperties.getInstance().getQueueSwapThreshold()); hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode(); } @@ -270,7 +269,6 @@ public final class StandardConnection implements Connection { private FlowFileRepository flowFileRepository; private ProvenanceEventRepository provenanceRepository; private ResourceClaimManager resourceClaimManager; - private Heartbeater heartbeater; public Builder(final ProcessScheduler scheduler) { this.scheduler = scheduler; @@ -306,11 +304,6 @@ public final class StandardConnection implements Connection { return this; } - public Builder heartbeater(final Heartbeater heartbeater) { - this.heartbeater = heartbeater; - return this; - } - public Builder bendPoints(final List<Position> bendPoints) { this.bendPoints.clear(); this.bendPoints.addAll(bendPoints); http://git-wip-us.apache.org/repos/asf/nifi/blob/deba41d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 7ea0408..95c2984 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -213,7 +213,7 @@ import org.slf4j.LoggerFactory; import com.sun.jersey.api.client.ClientHandlerException; -public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, Heartbeater, QueueProvider { +public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, QueueProvider { // default repository implementations public static final String DEFAULT_FLOWFILE_REPO_IMPLEMENTATION = "org.apache.nifi.controller.repository.WriteAheadFlowFileRepository"; @@ -428,7 +428,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R throw new RuntimeException(e); } - processScheduler = new StandardProcessScheduler(this, this, encryptor, stateManagerProvider); + processScheduler = new StandardProcessScheduler(this, encryptor, stateManagerProvider); eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler); controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, stateManagerProvider); @@ -824,7 +824,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R .resourceClaimManager(resourceClaimManager) .flowFileRepository(flowFileRepository) .provenanceRepository(provenanceEventRepository) - .heartbeater(this) .build(); } @@ -2890,7 +2889,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R public Counter resetCounter(final String identifier) { final CounterRepository counterRepo = counterRepositoryRef.get(); final Counter resetValue = counterRepo.resetCounter(identifier); - heartbeat(); return resetValue; } @@ -3597,7 +3595,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } } - @Override public void heartbeat() { if (!isClustered()) { return; http://git-wip-us.apache.org/repos/asf/nifi/blob/deba41d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index 22aacdc..4afd069 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -109,7 +109,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { private final FlowFileRepository flowFileRepository; private final ProvenanceEventRepository provRepository; private final ResourceClaimManager resourceClaimManager; - private final Heartbeater heartbeater; private final ConcurrentMap<String, DropFlowFileRequest> dropRequestMap = new ConcurrentHashMap<>(); private final ConcurrentMap<String, ListFlowFileRequest> listRequestMap = new ConcurrentHashMap<>(); @@ -118,8 +117,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { private final ProcessScheduler scheduler; public StandardFlowFileQueue(final String identifier, final Connection connection, final FlowFileRepository flowFileRepo, final ProvenanceEventRepository provRepo, - final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, final int swapThreshold, - final Heartbeater heartbeater) { + final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, final int swapThreshold) { activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList<FlowFilePrioritizer>())); priorities = new ArrayList<>(); swapQueue = new ArrayList<>(); @@ -133,7 +131,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { this.swapThreshold = swapThreshold; this.scheduler = scheduler; this.connection = connection; - this.heartbeater = heartbeater; readLock = new TimedLock(this.lock.readLock(), identifier + " Read Lock", 100); writeLock = new TimedLock(this.lock.writeLock(), identifier + " Write Lock", 100); @@ -710,6 +707,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } @Override + @SuppressWarnings("deprecation") public int compare(final FlowFileRecord f1, final FlowFileRecord f2) { int returnVal = 0; final boolean f1Penalized = f1.isPenalized(); @@ -1145,9 +1143,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { logger.info("Successfully dropped {} FlowFiles ({} bytes) from Connection with ID {} on behalf of {}", dropRequest.getDroppedSize().getObjectCount(), dropRequest.getDroppedSize().getByteCount(), StandardFlowFileQueue.this.getIdentifier(), requestor); dropRequest.setState(DropFlowFileState.COMPLETE); - if (heartbeater != null) { - heartbeater.heartbeat(); - } } catch (final Exception e) { logger.error("Failed to drop FlowFiles from Connection with ID {} due to {}", StandardFlowFileQueue.this.getIdentifier(), e.toString()); logger.error("", e); http://git-wip-us.apache.org/repos/asf/nifi/blob/deba41d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index ef27fb5..fe1e850 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -40,7 +40,6 @@ import org.apache.nifi.connectable.Port; import org.apache.nifi.controller.AbstractPort; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ControllerService; -import org.apache.nifi.controller.Heartbeater; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ReportingTaskNode; @@ -73,7 +72,6 @@ public final class StandardProcessScheduler implements ProcessScheduler { private static final Logger LOG = LoggerFactory.getLogger(StandardProcessScheduler.class); private final ControllerServiceProvider controllerServiceProvider; - private final Heartbeater heartbeater; private final long administrativeYieldMillis; private final String administrativeYieldDuration; private final StateManagerProvider stateManagerProvider; @@ -87,9 +85,8 @@ public final class StandardProcessScheduler implements ProcessScheduler { private final StringEncryptor encryptor; - public StandardProcessScheduler(final Heartbeater heartbeater, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor, + public StandardProcessScheduler(final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor, final StateManagerProvider stateManagerProvider) { - this.heartbeater = heartbeater; this.controllerServiceProvider = controllerServiceProvider; this.encryptor = encryptor; this.stateManagerProvider = stateManagerProvider; @@ -367,7 +364,6 @@ public final class StandardProcessScheduler implements ProcessScheduler { getSchedulingAgent(procNode).schedule(procNode, scheduleState); - heartbeater.heartbeat(); return; } } catch (final Exception e) { @@ -446,7 +442,6 @@ public final class StandardProcessScheduler implements ProcessScheduler { // If no threads currently running, call the OnStopped methods if (state.getActiveThreadCount() == 0 && state.mustCallOnStoppedMethods()) { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, procNode.getProcessor(), processContext); - heartbeater.heartbeat(); } } } @@ -525,7 +520,6 @@ public final class StandardProcessScheduler implements ProcessScheduler { final ConnectableProcessContext processContext = new ConnectableProcessContext(connectable, encryptor, getStateManager(connectable.getIdentifier())); try (final NarCloseable x = NarCloseable.withNarLoader()) { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, connectable, processContext); - heartbeater.heartbeat(); } } } @@ -636,12 +630,12 @@ public final class StandardProcessScheduler implements ProcessScheduler { @Override public void enableControllerService(final ControllerServiceNode service) { - service.enable(this.componentLifeCycleThreadPool, this.administrativeYieldMillis, this.heartbeater); + service.enable(this.componentLifeCycleThreadPool, this.administrativeYieldMillis); } @Override public void disableControllerService(final ControllerServiceNode service) { - service.disable(this.componentLifeCycleThreadPool, this.heartbeater); + service.disable(this.componentLifeCycleThreadPool); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/deba41d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java index 3f24ff1..bed6a35 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java @@ -36,7 +36,6 @@ import org.apache.nifi.controller.AbstractConfiguredComponent; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ControllerService; -import org.apache.nifi.controller.Heartbeater; import org.apache.nifi.controller.ValidationContextFactory; import org.apache.nifi.controller.annotation.OnConfigured; import org.apache.nifi.controller.exception.ComponentLifeCycleException; @@ -273,8 +272,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i * as it reached ENABLED state. */ @Override - public void enable(final ScheduledExecutorService scheduler, final long administrativeYieldMillis, - final Heartbeater heartbeater) { + public void enable(final ScheduledExecutorService scheduler, final long administrativeYieldMillis) { if (this.stateRef.compareAndSet(ControllerServiceState.DISABLED, ControllerServiceState.ENABLING)) { this.active.set(true); final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null); @@ -287,13 +285,11 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i synchronized (active) { shouldEnable = active.get() && stateRef.compareAndSet(ControllerServiceState.ENABLING, ControllerServiceState.ENABLED); } - if (shouldEnable) { - heartbeater.heartbeat(); - } else { + if (!shouldEnable) { LOG.debug("Disabling service " + this + " after it has been enabled due to disable action being initiated."); // Can only happen if user initiated DISABLE operation before service finished enabling. It's state will be // set to DISABLING (see disable() operation) - invokeDisable(configContext, heartbeater); + invokeDisable(configContext); stateRef.set(ControllerServiceState.DISABLED); } } catch (Exception e) { @@ -301,7 +297,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this); componentLog.error("Failed to invoke @OnEnabled method due to {}", cause); LOG.error("Failed to invoke @OnEnabled method of {} due to {}", getControllerServiceImplementation(), cause.toString()); - invokeDisable(configContext, heartbeater); + invokeDisable(configContext); if (isActive()) { scheduler.schedule(this, administrativeYieldMillis, TimeUnit.MILLISECONDS); @@ -323,14 +319,14 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i * If such transition doesn't succeed (the service is still in ENABLING state) * then the service will still be transitioned to DISABLING state to ensure that * no other transition could happen on this service. However in such event - * (e.g., its @OnEnabled finally succeeded), the {@link #enable(ScheduledExecutorService, long, Heartbeater)} - * operation will initiate service disabling javadoc for (see {@link #enable(ScheduledExecutorService, long, Heartbeater)} + * (e.g., its @OnEnabled finally succeeded), the {@link #enable(ScheduledExecutorService, long)} + * operation will initiate service disabling javadoc for (see {@link #enable(ScheduledExecutorService, long)} * <br> * Upon successful invocation of @OnDisabled this service will be transitioned to * DISABLED state. */ @Override - public void disable(ScheduledExecutorService scheduler, final Heartbeater heartbeater) { + public void disable(ScheduledExecutorService scheduler) { /* * The reason for synchronization is to ensure consistency of the * service state when another thread is in the middle of enabling this @@ -347,10 +343,9 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i @Override public void run() { try { - invokeDisable(configContext, heartbeater); + invokeDisable(configContext); } finally { stateRef.set(ControllerServiceState.DISABLED); - heartbeater.heartbeat(); } } }); @@ -362,7 +357,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i /** * */ - private void invokeDisable(ConfigurationContext configContext, Heartbeater heartbeater) { + private void invokeDisable(ConfigurationContext configContext) { try { ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, StandardControllerServiceNode.this.getControllerServiceImplementation(), configContext); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/nifi/blob/deba41d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java index 412e376..d73328f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java @@ -105,7 +105,7 @@ public class TestStandardFlowFileQueue { } }).when(provRepo).registerEvents(Mockito.any(Iterable.class)); - queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000, null); + queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000); TestFlowFile.idGenerator.set(0L); } http://git-wip-us.apache.org/repos/asf/nifi/blob/deba41d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index f8db35e..644018f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -139,7 +139,7 @@ public class TestStandardProcessSession { final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class); final FlowFileSwapManager swapManager = Mockito.mock(FlowFileSwapManager.class); - flowFileQueue = new StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000, null); + flowFileQueue = new StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000); when(connection.getFlowFileQueue()).thenReturn(flowFileQueue); Mockito.doAnswer(new Answer<Object>() { http://git-wip-us.apache.org/repos/asf/nifi/blob/deba41d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java index c1c7b45..8b079bf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java @@ -81,7 +81,7 @@ public class TestWriteAheadFlowFileRepository { when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class)); final FlowFileSwapManager swapMgr = new MockFlowFileSwapManager(); - final FlowFileQueue queue = new StandardFlowFileQueue("1234", connection, null, null, claimManager, null, swapMgr, null, 10000, null); + final FlowFileQueue queue = new StandardFlowFileQueue("1234", connection, null, null, claimManager, null, swapMgr, null, 10000); when(connection.getFlowFileQueue()).thenReturn(queue); queueProvider.addConnection(connection); @@ -156,6 +156,7 @@ public class TestWriteAheadFlowFileRepository { } @Test + @SuppressWarnings("deprecation") public void testRestartWithOneRecord() throws IOException { final Path path = Paths.get("target/test-repo"); if (Files.exists(path)) { http://git-wip-us.apache.org/repos/asf/nifi/blob/deba41d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java index f507eea..ed1e7a9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java @@ -19,7 +19,6 @@ package org.apache.nifi.controller.scheduling; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; import java.lang.reflect.Field; import java.util.ArrayList; @@ -36,10 +35,9 @@ import org.apache.nifi.annotation.lifecycle.OnDisabled; import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.components.state.StateManagerProvider; -import org.apache.nifi.controller.Heartbeater; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ReportingTaskNode; @@ -79,7 +77,7 @@ public class TestStandardProcessScheduler { public void setup() throws InitializationException { System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties"); this.refreshNiFiProperties(); - scheduler = new StandardProcessScheduler(Mockito.mock(Heartbeater.class), Mockito.mock(ControllerServiceProvider.class), null, stateMgrProvider); + scheduler = new StandardProcessScheduler(Mockito.mock(ControllerServiceProvider.class), null, stateMgrProvider); scheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, Mockito.mock(SchedulingAgent.class)); reportingTask = new TestReportingTask(); @@ -507,6 +505,6 @@ public class TestStandardProcessScheduler { } private ProcessScheduler createScheduler() { - return new StandardProcessScheduler(mock(Heartbeater.class), null, null, stateMgrProvider); + return new StandardProcessScheduler(null, null, stateMgrProvider); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/deba41d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java index 0dcacb5..cb147b3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java @@ -30,7 +30,6 @@ import java.util.UUID; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateManagerProvider; -import org.apache.nifi.controller.Heartbeater; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ScheduledState; @@ -80,8 +79,7 @@ public class TestStandardControllerServiceProvider { } private ProcessScheduler createScheduler() { - final Heartbeater heartbeater = Mockito.mock(Heartbeater.class); - return new StandardProcessScheduler(heartbeater, null, null, stateManagerProvider); + return new StandardProcessScheduler(null, null, stateManagerProvider); } @Test http://git-wip-us.apache.org/repos/asf/nifi/blob/deba41d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/BulletinBoardResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/BulletinBoardResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/BulletinBoardResource.java index 6197953..d13b5c9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/BulletinBoardResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/BulletinBoardResource.java @@ -16,20 +16,19 @@ */ package org.apache.nifi.web.api; -import com.wordnik.swagger.annotations.Api; -import com.wordnik.swagger.annotations.ApiOperation; -import com.wordnik.swagger.annotations.ApiParam; -import com.wordnik.swagger.annotations.ApiResponse; -import com.wordnik.swagger.annotations.ApiResponses; -import com.wordnik.swagger.annotations.Authorization; import javax.ws.rs.Consumes; import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; +import javax.ws.rs.HttpMethod; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.cluster.manager.impl.WebClusterManager; +import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.api.dto.BulletinBoardDTO; import org.apache.nifi.web.api.dto.BulletinQueryDTO; @@ -39,18 +38,23 @@ import org.apache.nifi.web.api.request.BulletinBoardPatternParameter; import org.apache.nifi.web.api.request.ClientIdParameter; import org.apache.nifi.web.api.request.IntegerParameter; import org.apache.nifi.web.api.request.LongParameter; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.security.access.prepost.PreAuthorize; +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiParam; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; +import com.wordnik.swagger.annotations.Authorization; + /** * RESTful endpoint for managing a Template. */ @Api(hidden = true) public class BulletinBoardResource extends ApplicationResource { - private static final Logger logger = LoggerFactory.getLogger(BulletinBoardResource.class); + private NiFiProperties properties; + private WebClusterManager clusterManager; private NiFiServiceFacade serviceFacade; @@ -128,6 +132,11 @@ public class BulletinBoardResource extends ApplicationResource { ) @QueryParam("limit") IntegerParameter limit) { + // replicate if cluster manager + if (properties.isClusterManager()) { + return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); + } + // build the bulletin query final BulletinQueryDTO query = new BulletinQueryDTO(); @@ -171,4 +180,11 @@ public class BulletinBoardResource extends ApplicationResource { this.serviceFacade = serviceFacade; } + public void setClusterManager(WebClusterManager clusterManager) { + this.clusterManager = clusterManager; + } + + public void setProperties(NiFiProperties properties) { + this.properties = properties; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/deba41d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java index a349f2a..14d5816 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java @@ -439,6 +439,8 @@ public class ControllerFacade { final ControllerStatusDTO controllerStatus = new ControllerStatusDTO(); controllerStatus.setActiveThreadCount(flowController.getActiveThreadCount()); controllerStatus.setQueued(FormatUtils.formatCount(controllerQueueSize.getObjectCount()) + " / " + FormatUtils.formatDataSize(controllerQueueSize.getByteCount())); + controllerStatus.setBytesQueued(controllerQueueSize.getByteCount()); + controllerStatus.setFlowFilesQueued(controllerQueueSize.getObjectCount()); final BulletinRepository bulletinRepository = getBulletinRepository(); controllerStatus.setBulletins(dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForController())); http://git-wip-us.apache.org/repos/asf/nifi/blob/deba41d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml index 1a57e22..8e46072 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml @@ -163,6 +163,8 @@ </bean> <bean id="bulletinBoardResource" class="org.apache.nifi.web.api.BulletinBoardResource" scope="singleton"> <property name="serviceFacade" ref="serviceFacade"/> + <property name="properties" ref="nifiProperties"/> + <property name="clusterManager" ref="clusterManager"/> </bean> <bean id="templateResource" class="org.apache.nifi.web.api.TemplateResource" scope="singleton"> <property name="serviceFacade" ref="serviceFacade"/>
