This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch support/nifi-1.16 in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 126ab0b8d8ff3f14dfb56d1c8ec6fb71351e4234 Author: markap14 <[email protected]> AuthorDate: Fri Apr 1 14:28:38 2022 -0400 NIFI-9853: Refactored StandardProcessGroupSynchronizer to make use of… (#5919) * NIFI-9853: Refactored StandardProcessGroupSynchronizer to make use of State Lookups and Compoennt Schedulers to ensure that we properly synchronize states when starting up, when exporting flow definitions, and when importing Flow Definitions * NIFI-9853: Fixed NPE --- .../nifi/groups/DefaultComponentScheduler.java | 15 ++ .../RetainExistingStateComponentScheduler.java | 145 ++++++++++++++++++ .../apache/nifi/groups/StandardProcessGroup.java | 8 +- .../groups/StandardProcessGroupSynchronizer.java | 84 +---------- .../nifi/groups/AbstractComponentScheduler.java | 162 +++++++++++++++++++++ .../org/apache/nifi/groups/ComponentScheduler.java | 28 ++++ .../mapping/VersionedComponentStateLookup.java | 9 +- .../org/apache/nifi/controller/FlowController.java | 98 ++++++++++--- .../serialization/VersionedFlowSynchronizer.java | 22 ++- 9 files changed, 460 insertions(+), 111 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/DefaultComponentScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/DefaultComponentScheduler.java index ee7fa9b003..a54ec61ab6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/DefaultComponentScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/DefaultComponentScheduler.java @@ -20,9 +20,19 @@ package org.apache.nifi.groups; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Port; import org.apache.nifi.controller.ProcessorNode; +import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.registry.flow.mapping.VersionedComponentStateLookup; import org.apache.nifi.remote.RemoteGroupPort; +import java.util.Collection; + public class DefaultComponentScheduler extends AbstractComponentScheduler { + + public DefaultComponentScheduler(final ControllerServiceProvider controllerServiceProvider, final VersionedComponentStateLookup stateLookup) { + super(controllerServiceProvider, stateLookup); + } + @Override protected void startNow(final Connectable component) { switch (component.getConnectableType()) { @@ -48,4 +58,9 @@ public class DefaultComponentScheduler extends AbstractComponentScheduler { } } } + + @Override + protected void enableNow(final Collection<ControllerServiceNode> controllerServices) { + getControllerServiceProvider().enableControllerServices(controllerServices); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/RetainExistingStateComponentScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/RetainExistingStateComponentScheduler.java new file mode 100644 index 0000000000..b6ed741889 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/RetainExistingStateComponentScheduler.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.groups; + +import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.controller.ScheduledState; +import org.apache.nifi.controller.Triggerable; +import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.controller.service.ControllerServiceState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public class RetainExistingStateComponentScheduler implements ComponentScheduler { + private static final Logger logger = LoggerFactory.getLogger(RetainExistingStateComponentScheduler.class); + + private final ComponentScheduler delegate; + private final Map<String, ScheduledState> connectableStates; + private final Map<String, ControllerServiceState> controllerServiceStates; + + public RetainExistingStateComponentScheduler(final ProcessGroup processGroup, final ComponentScheduler delegate) { + this.delegate = delegate; + this.connectableStates = mapConnectableStates(processGroup); + this.controllerServiceStates = mapControllerServiceStates(processGroup); + } + + @Override + public void startComponent(final Connectable component) { + final ScheduledState existingState = connectableStates.get(component.getIdentifier()); + if (existingState == null) { + logger.debug("Will not start {} because it was not previously known in this Process Group", component); + return; + } + + if (existingState != ScheduledState.RUNNING && existingState != ScheduledState.STARTING) { + logger.debug("Will not start {} because its previous state was {}", component, existingState); + return; + } + + logger.debug("Starting {}", component); + delegate.startComponent(component); + } + + @Override + public void stopComponent(final Connectable component) { + delegate.stopComponent(component); + } + + @Override + public void transitionComponentState(final Connectable component, final org.apache.nifi.flow.ScheduledState desiredState) { + delegate.transitionComponentState(component, desiredState); + } + + @Override + public void enableControllerServicesAsync(final Collection<ControllerServiceNode> controllerServices) { + final Set<ControllerServiceNode> toEnable = new HashSet<>(); + + for (final ControllerServiceNode service : controllerServices) { + final ControllerServiceState existingState = controllerServiceStates.get(service.getIdentifier()); + + if (existingState == null) { + logger.debug("Will not enable {} because it was not previously known in this Process Group", service); + continue; + } + + if (existingState != ControllerServiceState.ENABLED && existingState != ControllerServiceState.ENABLING) { + logger.debug("Will not enable {} because its previously state was {}", service, existingState); + continue; + } + + toEnable.add(service); + } + + logger.debug("Enabling {}", toEnable); + delegate.enableControllerServicesAsync(toEnable); + } + + @Override + public void disableControllerServicesAsync(final Collection<ControllerServiceNode> controllerServices) { + delegate.disableControllerServicesAsync(controllerServices); + } + + @Override + public void pause() { + delegate.pause(); + } + + @Override + public void resume() { + delegate.resume(); + } + + private Map<String, ControllerServiceState> mapControllerServiceStates(final ProcessGroup group) { + final Set<ControllerServiceNode> services = group.findAllControllerServices(); + final Map<String, ControllerServiceState> serviceStates = services.stream() + .collect(Collectors.toMap(ControllerServiceNode::getIdentifier, ControllerServiceNode::getState)); + + return serviceStates; + } + + private Map<String, ScheduledState> mapConnectableStates(final ProcessGroup group) { + final Set<Connectable> connectables = new HashSet<>(); + findAllConnectables(group, connectables); + + final Map<String, ScheduledState> connectableStates = connectables.stream() + .collect(Collectors.toMap(Connectable::getIdentifier, Triggerable::getScheduledState)); + + return connectableStates; + } + + private void findAllConnectables(final ProcessGroup group, final Set<Connectable> connectables) { + connectables.addAll(group.getInputPorts()); + connectables.addAll(group.getOutputPorts()); + connectables.addAll(group.getFunnels()); + connectables.addAll(group.getProcessors()); + for (final RemoteProcessGroup remoteGroup : group.getRemoteProcessGroups()) { + connectables.addAll(remoteGroup.getInputPorts()); + connectables.addAll(remoteGroup.getOutputPorts()); + } + + for (final ProcessGroup childGroup : group.getProcessGroups()) { + findAllConnectables(childGroup, connectables); + } + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 58505f9cd5..1924f9bcfb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -3783,11 +3783,13 @@ public final class StandardProcessGroup implements ProcessGroup { final boolean updateDescendantVersionedFlows) { final ComponentIdGenerator idGenerator = (proposedId, instanceId, destinationGroupId) -> generateUuid(proposedId, destinationGroupId, componentIdSeed); - final ComponentScheduler componentScheduler = new DefaultComponentScheduler(); + final VersionedComponentStateLookup stateLookup = VersionedComponentStateLookup.ENABLED_OR_DISABLED; + final ComponentScheduler defaultComponentScheduler = new DefaultComponentScheduler(controllerServiceProvider, stateLookup); + final ComponentScheduler retainExistingStateScheduler = new RetainExistingStateComponentScheduler(this, defaultComponentScheduler); final GroupSynchronizationOptions synchronizationOptions = new GroupSynchronizationOptions.Builder() .componentIdGenerator(idGenerator) - .componentScheduler(componentScheduler) + .componentScheduler(retainExistingStateScheduler) .ignoreLocalModifications(!verifyNotDirty) .updateDescendantVersionedFlows(updateDescendantVersionedFlows) .updateGroupSettings(updateSettings) @@ -3800,7 +3802,7 @@ public final class StandardProcessGroup implements ProcessGroup { final FlowMappingOptions flowMappingOptions = new FlowMappingOptions.Builder() .mapSensitiveConfiguration(false) .mapPropertyDescriptors(true) - .stateLookup(VersionedComponentStateLookup.ENABLED_OR_DISABLED) + .stateLookup(stateLookup) .sensitiveValueEncryptor(null) .componentIdLookup(ComponentIdLookup.VERSIONED_OR_GENERATE) .mapInstanceIdentifiers(false) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java index 3761f65f73..cefaf22ada 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java @@ -502,8 +502,7 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize // therefore is not valid until all have been created. toEnable.forEach(service -> { if (service.getState() == ControllerServiceState.DISABLED) { - LOG.debug("Enabling {}", service); - context.getControllerServiceProvider().enableControllerServicesAsync(Collections.singleton(service)); + context.getComponentScheduler().enableControllerServicesAsync(Collections.singleton(service)); } }); } @@ -1447,52 +1446,7 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize port.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY())); port.setMaxConcurrentTasks(proposed.getConcurrentlySchedulableTaskCount()); - final org.apache.nifi.flow.ScheduledState scheduledState = proposed.getScheduledState() == null ? org.apache.nifi.flow.ScheduledState.ENABLED : proposed.getScheduledState(); - - final ProcessGroup group = port.getProcessGroup(); - if (port.getConnectableType() == ConnectableType.INPUT_PORT) { - switch (scheduledState) { - case DISABLED: - group.disableInputPort(port); - break; - case ENABLED: - if (port.getScheduledState() == ScheduledState.DISABLED) { - group.enableInputPort(port); - } else if (port.getScheduledState() == ScheduledState.RUNNING) { - group.stopInputPort(port); - } - break; - case RUNNING: - if (port.getScheduledState() == ScheduledState.DISABLED) { - group.enableInputPort(port); - } - if (port.getScheduledState() == ScheduledState.STOPPED) { - context.getComponentScheduler().startComponent(port); - } - break; - } - } else if (port.getConnectableType() == ConnectableType.OUTPUT_PORT) { - switch (scheduledState) { - case DISABLED: - group.disableOutputPort(port); - break; - case ENABLED: - if (port.getScheduledState() == ScheduledState.DISABLED) { - group.enableOutputPort(port); - } else if (port.getScheduledState() == ScheduledState.RUNNING) { - group.stopOutputPort(port); - } - break; - case RUNNING: - if (port.getScheduledState() == ScheduledState.DISABLED) { - group.enableOutputPort(port); - } - if (port.getScheduledState() == ScheduledState.STOPPED) { - context.getComponentScheduler().startComponent(port); - } - break; - } - } + context.getComponentScheduler().transitionComponentState(port, proposed.getScheduledState()); } private Port addInputPort(final ProcessGroup destination, final VersionedPort proposed, final ComponentIdGenerator componentIdGenerator, final String temporaryName) { @@ -1606,36 +1560,8 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize processor.setBackoffMechanism(BackoffMechanism.valueOf(proposed.getBackoffMechanism())); } - final ScheduledState procState = processor.getScheduledState(); - final ProcessGroup group = processor.getProcessGroup(); - switch (proposed.getScheduledState()) { - case DISABLED: - if (procState == ScheduledState.RUNNING) { - LOG.debug("Stopping {}", processor); - group.stopProcessor(processor); - } - - LOG.debug("Disabling {}", processor); - group.disableProcessor(processor); - break; - case ENABLED: - if (procState == ScheduledState.DISABLED) { - LOG.debug("Enabling {}", processor); - group.enableProcessor(processor); - } else if (procState == ScheduledState.RUNNING) { - LOG.debug("Stopping {}", processor); - group.stopProcessor(processor); - } - break; - case RUNNING: - if (procState == ScheduledState.DISABLED) { - LOG.debug("Enabling {}", processor); - group.enableProcessor(processor); - } - LOG.debug("Starting {}", processor); - context.getComponentScheduler().startComponent(processor); - break; - } + // Transition state to disabled/enabled/running + context.getComponentScheduler().transitionComponentState(processor, proposed.getScheduledState()); if (!isEqual(processor.getBundleCoordinate(), proposed.getBundle())) { final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle()); @@ -1751,7 +1677,7 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize } } else { if (portState == ScheduledState.RUNNING) { - remoteGroupPort.getRemoteProcessGroup().stopTransmitting(remoteGroupPort); + context.getComponentScheduler().stopComponent(remoteGroupPort); } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/AbstractComponentScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/AbstractComponentScheduler.java index 3add0ac472..d1f6cc1ac0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/AbstractComponentScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/AbstractComponentScheduler.java @@ -18,9 +18,17 @@ package org.apache.nifi.groups; import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.connectable.Port; +import org.apache.nifi.controller.ProcessorNode; +import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.flow.ScheduledState; +import org.apache.nifi.registry.flow.mapping.VersionedComponentStateLookup; +import org.apache.nifi.remote.RemoteGroupPort; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collection; import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicLong; @@ -28,8 +36,17 @@ import java.util.concurrent.atomic.AtomicLong; public abstract class AbstractComponentScheduler implements ComponentScheduler { private static final Logger logger = LoggerFactory.getLogger(AbstractComponentScheduler.class); + private final ControllerServiceProvider serviceProvider; + private final VersionedComponentStateLookup stateLookup; + private final AtomicLong pauseCount = new AtomicLong(0L); private final Queue<Connectable> toStart = new LinkedBlockingQueue<>(); + private final Queue<ControllerServiceNode> toEnable = new LinkedBlockingQueue<>(); + + public AbstractComponentScheduler(final ControllerServiceProvider controllerServiceProvider, final VersionedComponentStateLookup stateLookup) { + this.serviceProvider = controllerServiceProvider; + this.stateLookup = stateLookup; + } @Override public void pause() { @@ -46,6 +63,9 @@ public abstract class AbstractComponentScheduler implements ComponentScheduler { return; } + logger.debug("{} enabling {}", this, toEnable); + enableNow(toEnable); + Connectable connectable; while ((connectable = toStart.poll()) != null) { logger.debug("{} starting {}", this, connectable); @@ -57,6 +77,105 @@ public abstract class AbstractComponentScheduler implements ComponentScheduler { return pauseCount.get() > 0; } + + @Override + public void transitionComponentState(final Connectable component, final ScheduledState desiredState) { + final ScheduledState scheduledState = getScheduledState(component); + final ScheduledState finalState = desiredState == null ? ScheduledState.ENABLED : desiredState; + + switch (finalState) { + case DISABLED: + if (scheduledState == ScheduledState.RUNNING) { + logger.debug("Stopping {}", component); + stopComponent(component); + } + + logger.debug("Disabling {}", component); + disable(component); + break; + case ENABLED: + if (scheduledState == ScheduledState.DISABLED) { + logger.debug("Enabling {}", component); + enable(component); + } else if (scheduledState == ScheduledState.RUNNING) { + logger.debug("Stopping {}", component); + stopComponent(component); + } + + break; + case RUNNING: + if (scheduledState == ScheduledState.DISABLED) { + logger.debug("Enabling {}", component); + enable(component); + } + + logger.debug("Starting {}", component); + startComponent(component); + break; + } + } + + private ScheduledState getScheduledState(final Connectable component) { + // Use the State Lookup to get the state, if possible. If, for some reason, it doesn't + // provide us a state (which should never happen) just fall back to the component's scheduled state. + switch (component.getConnectableType()) { + case INPUT_PORT: + case OUTPUT_PORT: + case REMOTE_INPUT_PORT: + case REMOTE_OUTPUT_PORT: + return stateLookup.getState((Port) component); + case PROCESSOR: + return stateLookup.getState((ProcessorNode) component); + case FUNNEL: + return ScheduledState.RUNNING; + } + + switch (component.getScheduledState()) { + case DISABLED: + return ScheduledState.DISABLED; + case RUN_ONCE: + case STOPPED: + case STOPPING: + return ScheduledState.ENABLED; + case RUNNING: + case STARTING: + default: + return ScheduledState.RUNNING; + } + } + + + private void enable(final Connectable component) { + final ProcessGroup group = component.getProcessGroup(); + switch (component.getConnectableType()) { + case INPUT_PORT: + group.enableInputPort((Port) component); + break; + case OUTPUT_PORT: + group.enableOutputPort((Port) component); + break; + case PROCESSOR: + group.enableProcessor((ProcessorNode) component); + break; + } + } + + private void disable(final Connectable component) { + final ProcessGroup group = component.getProcessGroup(); + switch (component.getConnectableType()) { + case INPUT_PORT: + group.disableInputPort((Port) component); + break; + case OUTPUT_PORT: + group.disableOutputPort((Port) component); + break; + case PROCESSOR: + group.disableProcessor((ProcessorNode) component); + break; + } + } + + @Override public void startComponent(final Connectable component) { if (isPaused()) { @@ -68,5 +187,48 @@ public abstract class AbstractComponentScheduler implements ComponentScheduler { } } + @Override + public void stopComponent(final Connectable component) { + final ProcessGroup processGroup = component.getProcessGroup(); + switch (component.getConnectableType()) { + case INPUT_PORT: + processGroup.stopInputPort((Port) component); + break; + case OUTPUT_PORT: + processGroup.stopOutputPort((Port) component); + break; + case PROCESSOR: + processGroup.stopProcessor((ProcessorNode) component); + break; + case REMOTE_INPUT_PORT: + case REMOTE_OUTPUT_PORT: + final RemoteGroupPort port = (RemoteGroupPort) component; + port.getRemoteProcessGroup().stopTransmitting(port); + break; + } + } + + @Override + public void enableControllerServicesAsync(final Collection<ControllerServiceNode> controllerServices) { + if (isPaused()) { + logger.debug("{} called to enable {} but paused so will queue them for start later", this, controllerServices); + toEnable.addAll(controllerServices); + } else { + logger.debug("{} enabling {} now", this, controllerServices); + enableNow(controllerServices); + } + } + + @Override + public void disableControllerServicesAsync(final Collection<ControllerServiceNode> controllerServices) { + serviceProvider.disableControllerServicesAsync(controllerServices); + } + + protected ControllerServiceProvider getControllerServiceProvider() { + return serviceProvider; + } + protected abstract void startNow(Connectable component); + + protected abstract void enableNow(Collection<ControllerServiceNode> controllerServices); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ComponentScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ComponentScheduler.java index bc6afcb979..6a38dde93d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ComponentScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ComponentScheduler.java @@ -18,10 +18,22 @@ package org.apache.nifi.groups; import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.flow.ScheduledState; + +import java.util.Collection; public interface ComponentScheduler { void startComponent(Connectable component); + void stopComponent(Connectable component); + + void transitionComponentState(Connectable component, ScheduledState desiredState); + + void enableControllerServicesAsync(Collection<ControllerServiceNode> controllerServices); + + void disableControllerServicesAsync(Collection<ControllerServiceNode> controllerServices); + void pause(); void resume(); @@ -32,6 +44,22 @@ public interface ComponentScheduler { public void startComponent(final Connectable component) { } + @Override + public void stopComponent(final Connectable component) { + } + + @Override + public void transitionComponentState(final Connectable component, final ScheduledState desiredState) { + } + + @Override + public void enableControllerServicesAsync(final Collection<ControllerServiceNode> controllerServices) { + } + + @Override + public void disableControllerServicesAsync(final Collection<ControllerServiceNode> controllerServices) { + } + @Override public void pause() { } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/mapping/VersionedComponentStateLookup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/mapping/VersionedComponentStateLookup.java index a1ceccd41f..3c855e7749 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/mapping/VersionedComponentStateLookup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/mapping/VersionedComponentStateLookup.java @@ -21,7 +21,6 @@ import org.apache.nifi.connectable.Port; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.service.ControllerServiceNode; -import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.flow.ScheduledState; public interface VersionedComponentStateLookup { @@ -54,7 +53,7 @@ public interface VersionedComponentStateLookup { @Override public ScheduledState getState(final ControllerServiceNode serviceNode) { - return (serviceNode.getState() == ControllerServiceState.DISABLED || serviceNode.getState() == ControllerServiceState.DISABLING) ? ScheduledState.DISABLED : ScheduledState.ENABLED; + return ScheduledState.DISABLED; } }; @@ -64,7 +63,7 @@ public interface VersionedComponentStateLookup { VersionedComponentStateLookup IDENTITY_LOOKUP = new VersionedComponentStateLookup() { @Override public ScheduledState getState(final ProcessorNode processorNode) { - return map(processorNode.getPhysicalScheduledState()); + return map(processorNode.getDesiredState()); } @Override @@ -91,6 +90,10 @@ public interface VersionedComponentStateLookup { } private ScheduledState map(final org.apache.nifi.controller.ScheduledState componentState) { + if (componentState == null) { + return null; + } + switch (componentState) { case DISABLED: return ScheduledState.DISABLED; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 4c8af1d0db..ec5e438dc6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -168,6 +168,7 @@ import org.apache.nifi.provenance.StandardProvenanceAuthorizableFactory; import org.apache.nifi.provenance.StandardProvenanceEventRecord; import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; +import org.apache.nifi.registry.flow.mapping.VersionedComponentStateLookup; import org.apache.nifi.registry.variable.MutableVariableRegistry; import org.apache.nifi.remote.HttpRemoteSiteListener; import org.apache.nifi.remote.RemoteGroupPort; @@ -1416,28 +1417,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node readLock.lock(); try { - final ScheduledStateLookup scheduledStateLookup = new ScheduledStateLookup() { - @Override - public ScheduledState getScheduledState(final ProcessorNode procNode) { - if (startConnectablesAfterInitialization.contains(procNode)) { - return ScheduledState.RUNNING; - } - - return procNode.getDesiredState(); - } - - @Override - public ScheduledState getScheduledState(final Port port) { - if (startConnectablesAfterInitialization.contains(port)) { - return ScheduledState.RUNNING; - } - if (startRemoteGroupPortsAfterInitialization.contains(port)) { - return ScheduledState.RUNNING; - } - - return port.getScheduledState(); - } - }; + final ScheduledStateLookup scheduledStateLookup = createScheduledStateLookup(); flowConfiguration = serializer.transform(this, scheduledStateLookup); } finally { @@ -1447,11 +1427,76 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node serializer.serialize(flowConfiguration, os); } + public ScheduledStateLookup createScheduledStateLookup() { + return new ScheduledStateLookup() { + @Override + public ScheduledState getScheduledState(final ProcessorNode procNode) { + if (startConnectablesAfterInitialization.contains(procNode)) { + return ScheduledState.RUNNING; + } + + return procNode.getDesiredState(); + } + + @Override + public ScheduledState getScheduledState(final Port port) { + if (startConnectablesAfterInitialization.contains(port)) { + return ScheduledState.RUNNING; + } + if (startRemoteGroupPortsAfterInitialization.contains(port)) { + return ScheduledState.RUNNING; + } + + return port.getScheduledState(); + } + }; + } + + /** + * Creates a VersionedComponentStateLookup that checks whether or not the given component is scheduled to start when the FlowController + * is initialized. If the FlowController has already been initialized or if the given component is not scheduled to start upon FlowController + * initialization, delegates the call to the provided lookup + * + * @param delegate the lookup to delegate calls to if a component is not scheduled to start upon FlowController initialization + * @return the VersionedComponentStateLookup that is created + */ + public VersionedComponentStateLookup createVersionedComponentStateLookup(final VersionedComponentStateLookup delegate) { + return new VersionedComponentStateLookup() { + @Override + public org.apache.nifi.flow.ScheduledState getState(final ProcessorNode processorNode) { + if (isStartAfterInitialization(processorNode)) { + return org.apache.nifi.flow.ScheduledState.RUNNING; + } + + return delegate.getState(processorNode); + } + + @Override + public org.apache.nifi.flow.ScheduledState getState(final Port port) { + if (isStartAfterInitialization(port)) { + return org.apache.nifi.flow.ScheduledState.RUNNING; + } + + return delegate.getState(port); + } + + @Override + public org.apache.nifi.flow.ScheduledState getState(final ReportingTaskNode taskNode) { + return delegate.getState(taskNode); + } + + @Override + public org.apache.nifi.flow.ScheduledState getState(final ControllerServiceNode serviceNode) { + return delegate.getState(serviceNode); + } + }; + } + /** * Synchronizes this controller with the proposed flow. * <p> * For more details, see - * {@link FlowSynchronizer#sync(FlowController, DataFlow, PropertyEncryptor, FlowService)}. + * {@link FlowSynchronizer#sync(FlowController, DataFlow, PropertyEncryptor, FlowService, BundleUpdateStrategy)}. * * @param synchronizer synchronizer * @param dataFlow the flow to load the controller with. If the flow is null @@ -1817,6 +1862,9 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node case REMOTE_OUTPUT_PORT: group.startOutputPort((Port) connectable); break; + case PROCESSOR: + group.startProcessor((ProcessorNode) connectable, true); + break; default: throw new IllegalArgumentException(); } @@ -1847,6 +1895,10 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node startConnectablesAfterInitialization.remove(connectable); group.stopOutputPort((Port) connectable); break; + case PROCESSOR: + startConnectablesAfterInitialization.remove(connectable); + group.stopProcessor((ProcessorNode) connectable); + break; default: throw new IllegalArgumentException(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java index 14f2799009..1dbf3b5b38 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java @@ -100,6 +100,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -310,7 +311,11 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer { final ComponentIdGenerator componentIdGenerator = (proposedId, instanceId, destinationGroupId) -> instanceId; - final ComponentScheduler componentScheduler = new FlowControllerComponentScheduler(controller); + // Use a Versioned Component State Lookup that will check to see if the component is scheduled to start upon FlowController initialization. + // Otherwise, fallback to the identity lookup (i.e., use whatever is set on the component itself). + final VersionedComponentStateLookup stateLookup = controller.createVersionedComponentStateLookup(VersionedComponentStateLookup.IDENTITY_LOOKUP); + + final ComponentScheduler componentScheduler = new FlowControllerComponentScheduler(controller, stateLookup); if (rootGroup.isEmpty()) { final VersionedProcessGroup versionedRoot = versionedExternalFlow.getFlowContents(); @@ -343,7 +348,7 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer { final FlowMappingOptions flowMappingOptions = new FlowMappingOptions.Builder() .mapSensitiveConfiguration(true) .mapPropertyDescriptors(false) - .stateLookup(VersionedComponentStateLookup.IDENTITY_LOOKUP) + .stateLookup(stateLookup) .sensitiveValueEncryptor(encryptor::encrypt) .componentIdLookup(ComponentIdLookup.VERSIONED_OR_GENERATE) .mapInstanceIdentifiers(true) @@ -985,7 +990,8 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer { private static class FlowControllerComponentScheduler extends AbstractComponentScheduler implements ComponentScheduler { private final FlowController flowController; - public FlowControllerComponentScheduler(final FlowController flowController) { + public FlowControllerComponentScheduler(final FlowController flowController, final VersionedComponentStateLookup stateLookup) { + super(flowController.getControllerServiceProvider(), stateLookup); this.flowController = flowController; } @@ -1005,5 +1011,15 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer { break; } } + + @Override + public void stopComponent(final Connectable component) { + flowController.stopConnectable(component); + } + + @Override + protected void enableNow(final Collection<ControllerServiceNode> controllerServices) { + flowController.getControllerServiceProvider().enableControllerServices(controllerServices); + } } }
