http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java index 2219d6d..bd0f0ab 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java @@ -16,19 +16,18 @@ */ package org.apache.nifi.controller.service; -import org.apache.nifi.components.ConfigurableComponent; -import org.apache.nifi.components.VersionedComponent; -import org.apache.nifi.controller.ConfiguredComponent; -import org.apache.nifi.controller.ControllerService; -import org.apache.nifi.controller.LoggableComponent; -import org.apache.nifi.groups.ProcessGroup; - import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; -public interface ControllerServiceNode extends ConfiguredComponent, ConfigurableComponent, VersionedComponent { +import org.apache.nifi.components.VersionedComponent; +import org.apache.nifi.controller.ComponentNode; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.controller.LoggableComponent; +import org.apache.nifi.groups.ProcessGroup; + +public interface ControllerServiceNode extends ComponentNode, VersionedComponent { /** * @return the Process Group that this Controller Service belongs to, or <code>null</code> if the Controller Service @@ -121,13 +120,13 @@ public interface ControllerServiceNode extends ConfiguredComponent, Configurable * Indicates that the given component is now referencing this Controller Service * @param referringComponent the component referencing this service */ - void addReference(ConfiguredComponent referringComponent); + void addReference(ComponentNode referringComponent); /** * Indicates that the given component is no longer referencing this Controller Service * @param referringComponent the component that is no longer referencing this service */ - void removeReference(ConfiguredComponent referringComponent); + void removeReference(ComponentNode referringComponent); void setComments(String comment);
http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java index ae5416c..56276f4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java @@ -24,7 +24,7 @@ import java.util.concurrent.Future; import org.apache.nifi.annotation.lifecycle.OnAdded; import org.apache.nifi.bundle.BundleCoordinate; -import org.apache.nifi.controller.ConfiguredComponent; +import org.apache.nifi.controller.ComponentNode; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceLookup; @@ -138,7 +138,7 @@ public interface ControllerServiceProvider extends ControllerServiceLookup { * * @param serviceNode the node */ - Set<ConfiguredComponent> unscheduleReferencingComponents(ControllerServiceNode serviceNode); + Set<ComponentNode> unscheduleReferencingComponents(ControllerServiceNode serviceNode); /** * Verifies that all Controller Services referencing the provided Controller @@ -159,7 +159,7 @@ public interface ControllerServiceProvider extends ControllerServiceLookup { * * @param serviceNode the node */ - Set<ConfiguredComponent> disableReferencingServices(ControllerServiceNode serviceNode); + Set<ComponentNode> disableReferencingServices(ControllerServiceNode serviceNode); /** * Verifies that all Controller Services referencing the provided @@ -181,7 +181,7 @@ public interface ControllerServiceProvider extends ControllerServiceLookup { * * @return the set of all components that were updated as a result of this action */ - Set<ConfiguredComponent> enableReferencingServices(ControllerServiceNode serviceNode); + Set<ComponentNode> enableReferencingServices(ControllerServiceNode serviceNode); /** * Verifies that all enabled Processors referencing the ControllerService @@ -203,7 +203,7 @@ public interface ControllerServiceProvider extends ControllerServiceLookup { * * @param serviceNode the node */ - Set<ConfiguredComponent> scheduleReferencingComponents(ControllerServiceNode serviceNode); + Set<ComponentNode> scheduleReferencingComponents(ControllerServiceNode serviceNode); /** * http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java index 13c5844..056bcbb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java @@ -19,7 +19,7 @@ package org.apache.nifi.controller.service; import java.util.List; import java.util.Set; -import org.apache.nifi.controller.ConfiguredComponent; +import org.apache.nifi.controller.ComponentNode; /** * Provides a collection of components that are referencing a Controller Service @@ -35,7 +35,7 @@ public interface ControllerServiceReference { * @return a {@link Set} of all components that are referencing this * Controller Service */ - Set<ConfiguredComponent> getReferencingComponents(); + Set<ComponentNode> getReferencingComponents(); /** * @return a {@link Set} of all Processors, Reporting Tasks, and Controller @@ -43,7 +43,7 @@ public interface ControllerServiceReference { * the case of Processors and Reporting Tasks) or enabled (in the case of * Controller Services) */ - Set<ConfiguredComponent> getActiveReferences(); + Set<ComponentNode> getActiveReferences(); /** * Returns a List of all components that reference this Controller Service (recursively) that http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java index da9374a..c266c64 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java @@ -25,12 +25,13 @@ import java.util.function.Predicate; import org.apache.nifi.authorization.resource.ComponentAuthorizable; import org.apache.nifi.components.VersionedComponent; +import org.apache.nifi.components.validation.ValidationStatus; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Funnel; import org.apache.nifi.connectable.Port; import org.apache.nifi.connectable.Positionable; -import org.apache.nifi.controller.ConfiguredComponent; +import org.apache.nifi.controller.ComponentNode; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.Snippet; @@ -59,7 +60,7 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi /** * Predicate for starting eligible Processors. */ - Predicate<ProcessorNode> START_PROCESSORS_FILTER = node -> !node.isRunning() && !ScheduledState.DISABLED.equals(node.getScheduledState()) && node.isValid(); + Predicate<ProcessorNode> START_PROCESSORS_FILTER = node -> !node.isRunning() && !ScheduledState.DISABLED.equals(node.getScheduledState()) && node.getValidationStatus() == ValidationStatus.VALID; /** * Predicate for stopping eligible Processors. @@ -399,7 +400,7 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi * @return a {@link Collection} of all FlowFileProcessors that are contained * within this. */ - Set<ProcessorNode> getProcessors(); + Collection<ProcessorNode> getProcessors(); /** * Returns the FlowFileProcessor with the given ID. @@ -972,7 +973,7 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi * @param variableName the name of the variable * @return a set of all components that are affected by the variable with the given name */ - Set<ConfiguredComponent> getComponentsAffectedByVariable(String variableName); + Set<ComponentNode> getComponentsAffectedByVariable(String variableName); /** * @return the version control information that indicates where this flow is stored in a Flow Registry, http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestAbstractComponentNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestAbstractComponentNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestAbstractComponentNode.java new file mode 100644 index 0000000..4102eca --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestAbstractComponentNode.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller; + +import static org.junit.Assert.assertEquals; + +import java.net.URL; +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.authorization.Resource; +import org.apache.nifi.authorization.resource.Authorizable; +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.components.ConfigurableComponent; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.validation.ValidationStatus; +import org.apache.nifi.components.validation.ValidationTrigger; +import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.registry.ComponentVariableRegistry; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestAbstractComponentNode { + + @Test(timeout = 5000) + public void testGetValidationStatusWithTimeout() { + final ValidationControlledAbstractComponentNode node = new ValidationControlledAbstractComponentNode(); + final ValidationStatus status = node.getValidationStatus(1, TimeUnit.MILLISECONDS); + assertEquals(ValidationStatus.VALIDATING, status); + } + + + private static class ValidationControlledAbstractComponentNode extends AbstractComponentNode { + + public ValidationControlledAbstractComponentNode() { + super("id", Mockito.mock(ValidationContextFactory.class), Mockito.mock(ControllerServiceProvider.class), "unit test component", + ValidationControlledAbstractComponentNode.class.getCanonicalName(), Mockito.mock(ComponentVariableRegistry.class), Mockito.mock(ReloadComponent.class), + Mockito.mock(ValidationTrigger.class), false); + } + + @Override + protected Collection<ValidationResult> computeValidationErrors(ValidationContext context) { + try { + Thread.sleep(5000L); + } catch (final InterruptedException ie) { + } + + return null; + } + + @Override + public void reload(Set<URL> additionalUrls) throws Exception { + } + + @Override + public BundleCoordinate getBundleCoordinate() { + return null; + } + + @Override + public ConfigurableComponent getComponent() { + return null; + } + + @Override + public TerminationAwareLogger getLogger() { + return null; + } + + @Override + public Class<?> getComponentClass() { + return ValidationControlledAbstractComponentNode.class; + } + + @Override + public boolean isRestricted() { + return false; + } + + @Override + public boolean isDeprecated() { + return false; + } + + @Override + public boolean isValidationNecessary() { + return true; + } + + @Override + public String getProcessGroupIdentifier() { + return "1234"; + } + + @Override + public Authorizable getParentAuthorizable() { + return null; + } + + @Override + public Resource getResource() { + return null; + } + + @Override + public void verifyModifiable() throws IllegalStateException { + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/validation/StandardValidationTrigger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/validation/StandardValidationTrigger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/validation/StandardValidationTrigger.java new file mode 100644 index 0000000..4f1f7dc --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/validation/StandardValidationTrigger.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.validation; + +import java.util.concurrent.ExecutorService; +import java.util.function.BooleanSupplier; + +import org.apache.nifi.controller.ComponentNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StandardValidationTrigger implements ValidationTrigger { + private static final Logger logger = LoggerFactory.getLogger(StandardValidationTrigger.class); + + private final ExecutorService threadPool; + private final BooleanSupplier flowInitialized; + + public StandardValidationTrigger(final ExecutorService threadPool, final BooleanSupplier flowInitialized) { + this.threadPool = threadPool; + this.flowInitialized = flowInitialized; + } + + @Override + public void triggerAsync(final ComponentNode component) { + if (!flowInitialized.getAsBoolean()) { + logger.debug("Triggered to perform validation on {} asynchronously but flow is not yet initialized so will ignore validation", component); + return; + } + + threadPool.submit(() -> trigger(component)); + } + + @Override + public void trigger(final ComponentNode component) { + try { + if (component.isValidationNecessary()) { + component.performValidation(); + } + } catch (final Throwable t) { + component.getLogger().error("Failed to perform validation due to " + t, t); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/validation/TriggerValidationTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/validation/TriggerValidationTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/validation/TriggerValidationTask.java new file mode 100644 index 0000000..0665dd2 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/validation/TriggerValidationTask.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.validation; + +import org.apache.nifi.controller.ComponentNode; +import org.apache.nifi.controller.FlowController; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TriggerValidationTask implements Runnable { + private static final Logger logger = LoggerFactory.getLogger(TriggerValidationTask.class); + + private final FlowController controller; + private final ValidationTrigger validationTrigger; + + public TriggerValidationTask(final FlowController controller, final ValidationTrigger validationTrigger) { + this.controller = controller; + this.validationTrigger = validationTrigger; + } + + @Override + public void run() { + try { + logger.debug("Triggering validation of all components"); + + for (final ComponentNode node : controller.getAllControllerServices()) { + validationTrigger.trigger(node); + } + + for (final ComponentNode node : controller.getAllReportingTasks()) { + validationTrigger.trigger(node); + } + + for (final ComponentNode node : controller.getRootGroup().findAllProcessors()) { + validationTrigger.trigger(node); + } + } catch (final Throwable t) { + logger.error("Encountered unexpected error when attempting to validate components", t); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 92ab7b3..091f0fb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -16,6 +16,41 @@ */ package org.apache.nifi.controller; +import static java.util.Objects.requireNonNull; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import javax.net.ssl.SSLContext; + import org.apache.commons.collections4.Predicate; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.action.Action; @@ -52,6 +87,10 @@ import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateManagerProvider; +import org.apache.nifi.components.validation.StandardValidationTrigger; +import org.apache.nifi.components.validation.TriggerValidationTask; +import org.apache.nifi.components.validation.ValidationStatus; +import org.apache.nifi.components.validation.ValidationTrigger; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.Connection; @@ -99,8 +138,8 @@ import org.apache.nifi.controller.repository.claim.StandardContentClaim; import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; import org.apache.nifi.controller.repository.io.LimitedInputStream; import org.apache.nifi.controller.scheduling.EventDrivenSchedulingAgent; -import org.apache.nifi.controller.scheduling.RepositoryContextFactory; import org.apache.nifi.controller.scheduling.QuartzSchedulingAgent; +import org.apache.nifi.controller.scheduling.RepositoryContextFactory; import org.apache.nifi.controller.scheduling.StandardProcessScheduler; import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent; import org.apache.nifi.controller.serialization.FlowSerializationException; @@ -206,6 +245,7 @@ import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.ReflectionUtils; import org.apache.nifi.util.SnippetUtils; +import org.apache.nifi.util.concurrency.TimedLock; import org.apache.nifi.web.ResourceNotFoundException; import org.apache.nifi.web.api.dto.BatchSettingsDTO; import org.apache.nifi.web.api.dto.BundleDTO; @@ -229,41 +269,6 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.SSLContext; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.lang.management.GarbageCollectorMXBean; -import java.lang.management.ManagementFactory; -import java.net.URL; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; - -import static java.util.Objects.requireNonNull; - public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, QueueProvider, Authorizable, ProvenanceAuthorizableFactory, NodeTypeProvider, IdentifierLookup, ReloadComponent { @@ -343,6 +348,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private final LeaderElectionManager leaderElectionManager; private final ClusterCoordinator clusterCoordinator; private final FlowRegistryClient flowRegistryClient; + private final FlowEngine validationThreadPool; + private final ValidationTrigger validationTrigger; /** * true if controller is configured to operate in a clustered environment @@ -397,8 +404,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private volatile boolean shutdown = false; private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); - private final Lock readLock = rwLock.readLock(); - private final Lock writeLock = rwLock.writeLock(); + private final TimedLock readLock = new TimedLock(rwLock.readLock(), "FlowControllerReadLock", 1); + private final TimedLock writeLock = new TimedLock(rwLock.writeLock(), "FlowControllerWriteLock", 1); private static final Logger LOG = LoggerFactory.getLogger(FlowController.class); @@ -460,6 +467,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return flowController; } + @SuppressWarnings("deprecation") private FlowController( final FlowFileEventRepository flowFileEventRepo, final NiFiProperties nifiProperties, @@ -568,7 +576,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R setRootGroup(rootGroup); instanceId = ComponentIdGenerator.generateId().toString(); - controllerServiceProvider = new StandardControllerServiceProvider(this, processScheduler, bulletinRepository, stateManagerProvider, this.variableRegistry, this.nifiProperties); + this.validationThreadPool = new FlowEngine(5, "Validate Components", true); + this.validationTrigger = new StandardValidationTrigger(validationThreadPool, this::isInitialized); + + controllerServiceProvider = new StandardControllerServiceProvider(this, processScheduler, bulletinRepository, stateManagerProvider, + this.variableRegistry, this.nifiProperties, validationTrigger); if (remoteInputSocketPort == null) { LOG.info("Not enabling RAW Socket Site-to-Site functionality because nifi.remote.input.socket.port is not set"); @@ -793,7 +805,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R initialized.set(true); } finally { - writeLock.unlock(); + writeLock.unlock("initializeFlow"); } } @@ -833,6 +845,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R public void onFlowInitialized(final boolean startDelayedComponents) { writeLock.lock(); try { + // Perform validation of all components before attempting to start them. + LOG.debug("Triggering initial validation of all components"); + final long start = System.nanoTime(); + new TriggerValidationTask(this, validationTrigger).run(); + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + LOG.info("Performed initial validation of all components in {} milliseconds", millis); + + // Trigger component validation to occur every 5 seconds. + validationThreadPool.scheduleWithFixedDelay(new TriggerValidationTask(this, validationTrigger), 5, 5, TimeUnit.SECONDS); + if (startDelayedComponents) { LOG.info("Starting {} processors/ports/funnels", startConnectablesAfterInitialization.size() + startRemoteGroupPortsAfterInitialization.size()); for (final Connectable connectable : startConnectablesAfterInitialization) { @@ -842,6 +864,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R try { if (connectable instanceof ProcessorNode) { + ((ProcessorNode) connectable).getValidationStatus(5, TimeUnit.SECONDS); connectable.getProcessGroup().startProcessor((ProcessorNode) connectable, true); } else { startConnectable(connectable); @@ -885,7 +908,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R startRemoteGroupPortsAfterInitialization.clear(); } } finally { - writeLock.unlock(); + writeLock.unlock("onFlowInitialized"); } } @@ -1150,12 +1173,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final ProcessorNode procNode; if (creationSuccessful) { procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, - nifiProperties, componentVarRegistry, this); + nifiProperties, componentVarRegistry, this, validationTrigger); } else { final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type; final String componentType = "(Missing) " + simpleClassName; procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, - componentType, type, nifiProperties, componentVarRegistry, this, true); + componentType, type, nifiProperties, componentVarRegistry, this, validationTrigger, true); } if (registerLogObserver) { @@ -1209,6 +1232,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } } + validationTrigger.triggerAsync(procNode); return procNode; } @@ -1288,6 +1312,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // need to refresh the properties in case we are changing from ghost component to real component existingNode.refreshProperties(); + + validationTrigger.triggerAsync(existingNode); } /** @@ -1303,7 +1329,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R try { return instanceId; } finally { - readLock.unlock(); + readLock.unlock("getInstanceId"); } } @@ -1451,7 +1477,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R try { return null == this.timerDrivenEngineRef.get() || this.timerDrivenEngineRef.get().isTerminated(); } finally { - this.readLock.unlock(); + this.readLock.unlock("isTerminated"); } } @@ -1495,6 +1521,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R LOG.info("Initiated graceful shutdown of flow controller...waiting up to " + gracefulShutdownSeconds + " seconds"); } + validationThreadPool.shutdown(); clusterTaskExecutor.shutdownNow(); if (zooKeeperStateServer != null) { @@ -1565,7 +1592,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } } } finally { - readLock.unlock(); + readLock.unlock("shutdown"); } } @@ -1577,7 +1604,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * @throws FlowSerializationException if serialization of the flow fails for * any reason */ - public void serialize(final FlowSerializer serializer, final OutputStream os) throws FlowSerializationException { + public synchronized <T> void serialize(final FlowSerializer<T> serializer, final OutputStream os) throws FlowSerializationException { + T flowConfiguration; + readLock.lock(); try { final ScheduledStateLookup scheduledStateLookup = new ScheduledStateLookup() { @@ -1603,10 +1632,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } }; - serializer.serialize(this, os, scheduledStateLookup); + flowConfiguration = serializer.transform(this, scheduledStateLookup); } finally { - readLock.unlock(); + readLock.unlock("serialize"); } + + serializer.serialize(flowConfiguration, os); } /** @@ -1639,7 +1670,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R flowSynchronized.set(true); LOG.info("Successfully synchronized controller with proposed flow"); } finally { - writeLock.unlock(); + writeLock.unlock("synchronize"); } } @@ -1668,7 +1699,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R try { setMaxThreadCount(maxThreadCount, this.timerDrivenEngineRef.get(), this.maxTimerDrivenThreads); } finally { - writeLock.unlock(); + writeLock.unlock("setMaxTimerDrivenThreadCount"); } } @@ -1678,7 +1709,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R setMaxThreadCount(maxThreadCount, this.eventDrivenEngineRef.get(), this.maxEventDrivenThreads); processScheduler.setMaxThreadCount(SchedulingStrategy.EVENT_DRIVEN, maxThreadCount); } finally { - writeLock.unlock(); + writeLock.unlock("setMaxEventDrivenThreadCount"); } } @@ -1732,7 +1763,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R allProcessGroups.put(group.getIdentifier(), group); allProcessGroups.put(ROOT_GROUP_ID_ALIAS, group); } finally { - writeLock.unlock(); + writeLock.unlock("setRootGroup"); } } @@ -1819,16 +1850,15 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } private void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto, final boolean topLevel) throws ProcessorInstantiationException { + validateSnippetContents(requireNonNull(group), dto); writeLock.lock(); try { - validateSnippetContents(requireNonNull(group), dto); - // // Instantiate Controller Services // for (final ControllerServiceDTO controllerServiceDTO : dto.getControllerServices()) { final BundleCoordinate bundleCoordinate = BundleUtils.getBundle(controllerServiceDTO.getType(), controllerServiceDTO.getBundle()); - final ControllerServiceNode serviceNode = createControllerService(controllerServiceDTO.getType(), controllerServiceDTO.getId(), bundleCoordinate, Collections.emptySet(),true); + final ControllerServiceNode serviceNode = createControllerService(controllerServiceDTO.getType(), controllerServiceDTO.getId(), bundleCoordinate, Collections.emptySet(), true); serviceNode.setAnnotationData(controllerServiceDTO.getAnnotationData()); serviceNode.setComments(controllerServiceDTO.getComments()); @@ -2155,7 +2185,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R group.addConnection(connection); } } finally { - writeLock.unlock(); + writeLock.unlock("instantiateSnippet"); } } @@ -2732,6 +2762,18 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } /** + * Returns the status for components in the specified group. This request is + * made by the specified user so the results will be filtered accordingly. + * + * @param groupId group id + * @param user user making request + * @return the component status + */ + public ProcessGroupStatus getGroupStatus(final String groupId, final NiFiUser user, final int recursiveStatusDepth) { + return getGroupStatus(groupId, getProcessorStats(), user, recursiveStatusDepth); + } + + /** * Returns the status for the components in the specified group with the * specified report. This request is not in the context of a user so the * results will be unfiltered. @@ -2744,7 +2786,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final ProcessGroup group = getGroup(groupId); // this was invoked with no user context so the results will be unfiltered... necessary for aggregating status history - return getGroupStatus(group, statusReport, authorizable -> true); + return getGroupStatus(group, statusReport, authorizable -> true, Integer.MAX_VALUE, 1); } /** @@ -2761,7 +2803,26 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final ProcessGroup group = getGroup(groupId); // on demand status request for a specific user... require authorization per component and filter results as appropriate - return getGroupStatus(group, statusReport, authorizable -> authorizable.isAuthorized(authorizer, RequestAction.READ, user)); + return getGroupStatus(group, statusReport, authorizable -> authorizable.isAuthorized(authorizer, RequestAction.READ, user), Integer.MAX_VALUE, 1); + } + + + /** + * Returns the status for the components in the specified group with the + * specified report. This request is made by the specified user so the + * results will be filtered accordingly. + * + * @param groupId group id + * @param statusReport report + * @param user user making request + * @param recursiveStatusDepth the number of levels deep we should recurse and still include the the processors' statuses, the groups' statuses, etc. in the returned ProcessGroupStatus + * @return the component status + */ + public ProcessGroupStatus getGroupStatus(final String groupId, final RepositoryStatusReport statusReport, final NiFiUser user, final int recursiveStatusDepth) { + final ProcessGroup group = getGroup(groupId); + + // on demand status request for a specific user... require authorization per component and filter results as appropriate + return getGroupStatus(group, statusReport, authorizable -> authorizable.isAuthorized(authorizer, RequestAction.READ, user), recursiveStatusDepth, 1); } /** @@ -2772,9 +2833,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * @param group group id * @param statusReport report * @param isAuthorized is authorized check + * @param recursiveStatusDepth the number of levels deep we should recurse and still include the the processors' statuses, the groups' statuses, etc. in the returned ProcessGroupStatus + * @param currentDepth the current number of levels deep that we have recursed * @return the component status */ - public ProcessGroupStatus getGroupStatus(final ProcessGroup group, final RepositoryStatusReport statusReport, final Predicate<Authorizable> isAuthorized) { + private ProcessGroupStatus getGroupStatus(final ProcessGroup group, final RepositoryStatusReport statusReport, final Predicate<Authorizable> isAuthorized, + final int recursiveStatusDepth, final int currentDepth) { if (group == null) { return null; } @@ -2799,12 +2863,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R int flowFilesTransferred = 0; long bytesTransferred = 0; + final boolean populateChildStatuses = currentDepth <= recursiveStatusDepth; + // set status for processors final Collection<ProcessorStatus> processorStatusCollection = new ArrayList<>(); status.setProcessorStatus(processorStatusCollection); for (final ProcessorNode procNode : group.getProcessors()) { final ProcessorStatus procStat = getProcessorStatus(statusReport, procNode, isAuthorized); - processorStatusCollection.add(procStat); + if (populateChildStatuses) { + processorStatusCollection.add(procStat); + } activeGroupThreads += procStat.getActiveThreadCount(); terminatedGroupThreads += procStat.getTerminatedThreadCount(); bytesRead += procStat.getBytesRead(); @@ -2820,8 +2888,18 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final Collection<ProcessGroupStatus> localChildGroupStatusCollection = new ArrayList<>(); status.setProcessGroupStatus(localChildGroupStatusCollection); for (final ProcessGroup childGroup : group.getProcessGroups()) { - final ProcessGroupStatus childGroupStatus = getGroupStatus(childGroup, statusReport, isAuthorized); - localChildGroupStatusCollection.add(childGroupStatus); + final ProcessGroupStatus childGroupStatus; + if (populateChildStatuses) { + childGroupStatus = getGroupStatus(childGroup, statusReport, isAuthorized, recursiveStatusDepth, currentDepth + 1); + localChildGroupStatusCollection.add(childGroupStatus); + } else { + // In this case, we don't want to include any of the recursive components' individual statuses. As a result, we can + // avoid performing any sort of authorizations. Because we only care about the numbers that come back, we can just indicate + // that the user is not authorized. This allows us to avoid the expense of both performing the authorization and calculating + // things that we would otherwise need to calculate if the user were in fact authorized. + childGroupStatus = getGroupStatus(childGroup, statusReport, authorizable -> false, recursiveStatusDepth, currentDepth + 1); + } + activeGroupThreads += childGroupStatus.getActiveThreadCount(); terminatedGroupThreads += childGroupStatus.getTerminatedThreadCount(); bytesRead += childGroupStatus.getBytesRead(); @@ -2844,7 +2922,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R for (final RemoteProcessGroup remoteGroup : group.getRemoteProcessGroups()) { final RemoteProcessGroupStatus remoteStatus = createRemoteGroupStatus(remoteGroup, statusReport, isAuthorized); if (remoteStatus != null) { - remoteProcessGroupStatusCollection.add(remoteStatus); + if (populateChildStatuses) { + remoteProcessGroupStatusCollection.add(remoteStatus); + } flowFilesReceived += remoteStatus.getReceivedCount(); bytesReceived += remoteStatus.getReceivedContentSize(); @@ -2905,7 +2985,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R connStatus.setQueuedBytes(connectionQueuedBytes); connStatus.setQueuedCount(connectionQueuedCount); } - connectionStatusCollection.add(connStatus); + + if (populateChildStatuses) { + connectionStatusCollection.add(connStatus); + } + queuedCount += connectionQueuedCount; queuedContentSize += connectionQueuedBytes; @@ -2978,7 +3062,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R bytesReceived += entry.getBytesReceived(); } - inputPortStatusCollection.add(portStatus); + if (populateChildStatuses) { + inputPortStatusCollection.add(portStatus); + } + activeGroupThreads += portStatus.getActiveThreadCount(); } @@ -3039,7 +3126,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R bytesSent += entry.getBytesSent(); } - outputPortStatusCollection.add(portStatus); + if (populateChildStatuses) { + outputPortStatusCollection.add(portStatus); + } + activeGroupThreads += portStatus.getActiveThreadCount(); } @@ -3216,7 +3306,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R status.setRunStatus(RunStatus.Disabled); } else if (ScheduledState.RUNNING.equals(procNode.getScheduledState())) { status.setRunStatus(RunStatus.Running); - } else if (!procNode.isValid()) { + } else if (procNode.getValidationStatus() == ValidationStatus.INVALID) { status.setRunStatus(RunStatus.Invalid); } else { status.setRunStatus(RunStatus.Stopped); @@ -3248,7 +3338,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R startConnectablesAfterInitialization.add(node); } } finally { - writeLock.unlock(); + writeLock.unlock("startProcessor"); } } @@ -3285,7 +3375,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R startConnectablesAfterInitialization.add(connectable); } } finally { - writeLock.unlock(); + writeLock.unlock("startConnectable"); } } @@ -3312,7 +3402,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R throw new IllegalArgumentException(); } } finally { - writeLock.unlock(); + writeLock.unlock("stopConnectable"); } } @@ -3325,7 +3415,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R startRemoteGroupPortsAfterInitialization.add(remoteGroupPort); } } finally { - writeLock.unlock(); + writeLock.unlock("startTransmitting"); } } @@ -3393,12 +3483,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider, componentVarRegistry); final ReportingTaskNode taskNode; if (creationSuccessful) { - taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, componentVarRegistry, this); + taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, componentVarRegistry, this, validationTrigger); } else { final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type; final String componentType = "(Missing) " + simpleClassName; - taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, componentType, type, componentVarRegistry, this, true); + taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, componentType, type, componentVarRegistry, this, validationTrigger, true); } taskNode.setName(taskNode.getReportingTask().getClass().getSimpleName()); @@ -3429,6 +3519,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R new ReportingTaskLogObserver(getBulletinRepository(), taskNode)); } + validationTrigger.triggerAsync(taskNode); return taskNode; } @@ -3503,6 +3594,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // need to refresh the properties in case we are changing from ghost component to real component existingNode.refreshProperties(); + + validationTrigger.triggerAsync(existingNode); } @Override @@ -3515,6 +3608,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R if (isTerminated()) { throw new IllegalStateException("Cannot start reporting task " + reportingTaskNode.getIdentifier() + " because the controller is terminated"); } + + reportingTaskNode.performValidation(); // ensure that the reporting task has completed its validation before attempting to start it reportingTaskNode.verifyCanStart(); reportingTaskNode.reloadAdditionalResourcesIfNecessary(); processScheduler.schedule(reportingTaskNode); @@ -3558,6 +3653,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R reportingTasks.remove(reportingTaskNode.getIdentifier()); LogRepositoryFactory.removeRepository(reportingTaskNode.getIdentifier()); + processScheduler.onReportingTaskRemoved(reportingTaskNode); + ExtensionManager.removeInstanceClassLoader(reportingTaskNode.getIdentifier()); } @@ -3589,6 +3686,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } } + validationTrigger.triggerAsync(serviceNode); return serviceNode; } @@ -3641,6 +3739,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // need to refresh the properties in case we are changing from ghost component to real component existingNode.refreshProperties(); + + validationTrigger.triggerAsync(existingNode); } @Override @@ -3657,22 +3757,22 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } @Override - public Set<ConfiguredComponent> disableReferencingServices(final ControllerServiceNode serviceNode) { + public Set<ComponentNode> disableReferencingServices(final ControllerServiceNode serviceNode) { return controllerServiceProvider.disableReferencingServices(serviceNode); } @Override - public Set<ConfiguredComponent> enableReferencingServices(final ControllerServiceNode serviceNode) { + public Set<ComponentNode> enableReferencingServices(final ControllerServiceNode serviceNode) { return controllerServiceProvider.enableReferencingServices(serviceNode); } @Override - public Set<ConfiguredComponent> scheduleReferencingComponents(final ControllerServiceNode serviceNode) { + public Set<ComponentNode> scheduleReferencingComponents(final ControllerServiceNode serviceNode) { return controllerServiceProvider.scheduleReferencingComponents(serviceNode); } @Override - public Set<ConfiguredComponent> unscheduleReferencingComponents(final ControllerServiceNode serviceNode) { + public Set<ComponentNode> unscheduleReferencingComponents(final ControllerServiceNode serviceNode) { return controllerServiceProvider.unscheduleReferencingComponents(serviceNode); } @@ -3966,7 +4066,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R this.heartbeatSendTask.set(sendTask); heartbeatSenderFuture = clusterTaskExecutor.scheduleWithFixedDelay(sendTask, 0, heartbeatDelaySeconds, TimeUnit.SECONDS); } finally { - writeLock.unlock(); + writeLock.unlock("startHeartbeating"); } } @@ -4014,7 +4114,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R heartbeatSenderFuture.cancel(false); } } finally { - writeLock.unlock(); + writeLock.unlock("stopHeartbeating"); } } @@ -4027,7 +4127,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R try { return heartbeatSenderFuture != null && !heartbeatSenderFuture.isCancelled(); } finally { - readLock.unlock(); + readLock.unlock("isHeartbeating"); } } @@ -4039,7 +4139,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R try { return heartbeatDelaySeconds; } finally { - readLock.unlock(); + readLock.unlock("getHeartbeatDelaySeconds"); } } @@ -4072,7 +4172,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R try { return clustered; } finally { - readLock.unlock(); + readLock.unlock("isClustered"); } } @@ -4087,6 +4187,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R @Override public synchronized void onLeaderRelinquish() { LOG.info("This node is no longer the elected Active Cluster Coordinator"); + bulletinRepository.addBulletin(BulletinFactory.createBulletin("Cluster Coordinator", Severity.INFO.name(), participantId + " is no longer the Cluster Coordinator")); // We do not want to stop the heartbeat monitor. This is because even though ZooKeeper offers guarantees // that watchers will see changes on a ZNode in the order they happened, there does not seem to be any @@ -4101,6 +4202,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R @Override public synchronized void onLeaderElection() { LOG.info("This node elected Active Cluster Coordinator"); + bulletinRepository.addBulletin(BulletinFactory.createBulletin("Cluster Coordinator", Severity.INFO.name(), participantId + " has been elected the Cluster Coordinator")); // Purge any heartbeats that we already have. If we don't do this, we can have a scenario where we receive heartbeats // from a node, and then another node becomes Cluster Coordinator. As a result, we stop receiving heartbeats. Now that @@ -4192,7 +4294,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // update the heartbeat bean this.heartbeatBeanRef.set(new HeartbeatBean(getRootGroup(), isPrimary())); } finally { - writeLock.unlock(); + writeLock.unlock("setClustered"); } } @@ -4713,7 +4815,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R try { bean = new HeartbeatBean(getGroup(getRootGroupId()), isPrimary()); } finally { - readLock.unlock(); + readLock.unlock("createHeartbeatMessage"); } }
