This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.16
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 126ab0b8d8ff3f14dfb56d1c8ec6fb71351e4234
Author: markap14 <[email protected]>
AuthorDate: Fri Apr 1 14:28:38 2022 -0400

    NIFI-9853: Refactored StandardProcessGroupSynchronizer to make use of… 
(#5919)
    
    * NIFI-9853: Refactored StandardProcessGroupSynchronizer to make use of 
State Lookups and Compoennt Schedulers to ensure that we properly synchronize 
states when starting up, when exporting flow definitions, and when importing 
Flow Definitions
    
    * NIFI-9853: Fixed NPE
---
 .../nifi/groups/DefaultComponentScheduler.java     |  15 ++
 .../RetainExistingStateComponentScheduler.java     | 145 ++++++++++++++++++
 .../apache/nifi/groups/StandardProcessGroup.java   |   8 +-
 .../groups/StandardProcessGroupSynchronizer.java   |  84 +----------
 .../nifi/groups/AbstractComponentScheduler.java    | 162 +++++++++++++++++++++
 .../org/apache/nifi/groups/ComponentScheduler.java |  28 ++++
 .../mapping/VersionedComponentStateLookup.java     |   9 +-
 .../org/apache/nifi/controller/FlowController.java |  98 ++++++++++---
 .../serialization/VersionedFlowSynchronizer.java   |  22 ++-
 9 files changed, 460 insertions(+), 111 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/DefaultComponentScheduler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/DefaultComponentScheduler.java
index ee7fa9b003..a54ec61ab6 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/DefaultComponentScheduler.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/DefaultComponentScheduler.java
@@ -20,9 +20,19 @@ package org.apache.nifi.groups;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.Port;
 import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.registry.flow.mapping.VersionedComponentStateLookup;
 import org.apache.nifi.remote.RemoteGroupPort;
 
+import java.util.Collection;
+
 public class DefaultComponentScheduler extends AbstractComponentScheduler {
+
+    public DefaultComponentScheduler(final ControllerServiceProvider 
controllerServiceProvider, final VersionedComponentStateLookup stateLookup) {
+        super(controllerServiceProvider, stateLookup);
+    }
+
     @Override
     protected void startNow(final Connectable component) {
         switch (component.getConnectableType()) {
@@ -48,4 +58,9 @@ public class DefaultComponentScheduler extends 
AbstractComponentScheduler {
             }
         }
     }
+
+    @Override
+    protected void enableNow(final Collection<ControllerServiceNode> 
controllerServices) {
+        
getControllerServiceProvider().enableControllerServices(controllerServices);
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/RetainExistingStateComponentScheduler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/RetainExistingStateComponentScheduler.java
new file mode 100644
index 0000000000..b6ed741889
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/RetainExistingStateComponentScheduler.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.groups;
+
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.Triggerable;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class RetainExistingStateComponentScheduler implements 
ComponentScheduler {
+    private static final Logger logger = 
LoggerFactory.getLogger(RetainExistingStateComponentScheduler.class);
+
+    private final ComponentScheduler delegate;
+    private final Map<String, ScheduledState> connectableStates;
+    private final Map<String, ControllerServiceState> controllerServiceStates;
+
+    public RetainExistingStateComponentScheduler(final ProcessGroup 
processGroup, final ComponentScheduler delegate) {
+        this.delegate = delegate;
+        this.connectableStates = mapConnectableStates(processGroup);
+        this.controllerServiceStates = 
mapControllerServiceStates(processGroup);
+    }
+
+    @Override
+    public void startComponent(final Connectable component) {
+        final ScheduledState existingState = 
connectableStates.get(component.getIdentifier());
+        if (existingState == null) {
+            logger.debug("Will not start {} because it was not previously 
known in this Process Group", component);
+            return;
+        }
+
+        if (existingState != ScheduledState.RUNNING && existingState != 
ScheduledState.STARTING) {
+            logger.debug("Will not start {} because its previous state was 
{}", component, existingState);
+            return;
+        }
+
+        logger.debug("Starting {}", component);
+        delegate.startComponent(component);
+    }
+
+    @Override
+    public void stopComponent(final Connectable component) {
+        delegate.stopComponent(component);
+    }
+
+    @Override
+    public void transitionComponentState(final Connectable component, final 
org.apache.nifi.flow.ScheduledState desiredState) {
+        delegate.transitionComponentState(component, desiredState);
+    }
+
+    @Override
+    public void enableControllerServicesAsync(final 
Collection<ControllerServiceNode> controllerServices) {
+        final Set<ControllerServiceNode> toEnable = new HashSet<>();
+
+        for (final ControllerServiceNode service : controllerServices) {
+            final ControllerServiceState existingState = 
controllerServiceStates.get(service.getIdentifier());
+
+            if (existingState == null) {
+                logger.debug("Will not enable {} because it was not previously 
known in this Process Group", service);
+                continue;
+            }
+
+            if (existingState != ControllerServiceState.ENABLED && 
existingState != ControllerServiceState.ENABLING) {
+                logger.debug("Will not enable {} because its previously state 
was {}", service, existingState);
+                continue;
+            }
+
+            toEnable.add(service);
+        }
+
+        logger.debug("Enabling {}", toEnable);
+        delegate.enableControllerServicesAsync(toEnable);
+    }
+
+    @Override
+    public void disableControllerServicesAsync(final 
Collection<ControllerServiceNode> controllerServices) {
+        delegate.disableControllerServicesAsync(controllerServices);
+    }
+
+    @Override
+    public void pause() {
+        delegate.pause();
+    }
+
+    @Override
+    public void resume() {
+        delegate.resume();
+    }
+
+    private Map<String, ControllerServiceState> 
mapControllerServiceStates(final ProcessGroup group) {
+        final Set<ControllerServiceNode> services = 
group.findAllControllerServices();
+        final Map<String, ControllerServiceState> serviceStates = 
services.stream()
+            .collect(Collectors.toMap(ControllerServiceNode::getIdentifier, 
ControllerServiceNode::getState));
+
+        return serviceStates;
+    }
+
+    private Map<String, ScheduledState> mapConnectableStates(final 
ProcessGroup group) {
+        final Set<Connectable> connectables = new HashSet<>();
+        findAllConnectables(group, connectables);
+
+        final Map<String, ScheduledState> connectableStates = 
connectables.stream()
+            .collect(Collectors.toMap(Connectable::getIdentifier, 
Triggerable::getScheduledState));
+
+        return connectableStates;
+    }
+
+    private void findAllConnectables(final ProcessGroup group, final 
Set<Connectable> connectables) {
+        connectables.addAll(group.getInputPorts());
+        connectables.addAll(group.getOutputPorts());
+        connectables.addAll(group.getFunnels());
+        connectables.addAll(group.getProcessors());
+        for (final RemoteProcessGroup remoteGroup : 
group.getRemoteProcessGroups()) {
+            connectables.addAll(remoteGroup.getInputPorts());
+            connectables.addAll(remoteGroup.getOutputPorts());
+        }
+
+        for (final ProcessGroup childGroup : group.getProcessGroups()) {
+            findAllConnectables(childGroup, connectables);
+        }
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 58505f9cd5..1924f9bcfb 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -3783,11 +3783,13 @@ public final class StandardProcessGroup implements 
ProcessGroup {
                            final boolean updateDescendantVersionedFlows) {
 
         final ComponentIdGenerator idGenerator = (proposedId, instanceId, 
destinationGroupId) -> generateUuid(proposedId, destinationGroupId, 
componentIdSeed);
-        final ComponentScheduler componentScheduler = new 
DefaultComponentScheduler();
+        final VersionedComponentStateLookup stateLookup = 
VersionedComponentStateLookup.ENABLED_OR_DISABLED;
+        final ComponentScheduler defaultComponentScheduler = new 
DefaultComponentScheduler(controllerServiceProvider, stateLookup);
+        final ComponentScheduler retainExistingStateScheduler = new 
RetainExistingStateComponentScheduler(this, defaultComponentScheduler);
 
         final GroupSynchronizationOptions synchronizationOptions = new 
GroupSynchronizationOptions.Builder()
             .componentIdGenerator(idGenerator)
-            .componentScheduler(componentScheduler)
+            .componentScheduler(retainExistingStateScheduler)
             .ignoreLocalModifications(!verifyNotDirty)
             .updateDescendantVersionedFlows(updateDescendantVersionedFlows)
             .updateGroupSettings(updateSettings)
@@ -3800,7 +3802,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
         final FlowMappingOptions flowMappingOptions = new 
FlowMappingOptions.Builder()
             .mapSensitiveConfiguration(false)
             .mapPropertyDescriptors(true)
-            .stateLookup(VersionedComponentStateLookup.ENABLED_OR_DISABLED)
+            .stateLookup(stateLookup)
             .sensitiveValueEncryptor(null)
             .componentIdLookup(ComponentIdLookup.VERSIONED_OR_GENERATE)
             .mapInstanceIdentifiers(false)
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
index 3761f65f73..cefaf22ada 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
@@ -502,8 +502,7 @@ public class StandardProcessGroupSynchronizer implements 
ProcessGroupSynchronize
         // therefore is not valid until all have been created.
         toEnable.forEach(service -> {
             if (service.getState() == ControllerServiceState.DISABLED) {
-                LOG.debug("Enabling {}", service);
-                
context.getControllerServiceProvider().enableControllerServicesAsync(Collections.singleton(service));
+                
context.getComponentScheduler().enableControllerServicesAsync(Collections.singleton(service));
             }
         });
     }
@@ -1447,52 +1446,7 @@ public class StandardProcessGroupSynchronizer implements 
ProcessGroupSynchronize
         port.setPosition(new Position(proposed.getPosition().getX(), 
proposed.getPosition().getY()));
         
port.setMaxConcurrentTasks(proposed.getConcurrentlySchedulableTaskCount());
 
-        final org.apache.nifi.flow.ScheduledState scheduledState = 
proposed.getScheduledState() == null ? 
org.apache.nifi.flow.ScheduledState.ENABLED : proposed.getScheduledState();
-
-        final ProcessGroup group = port.getProcessGroup();
-        if (port.getConnectableType() == ConnectableType.INPUT_PORT) {
-            switch (scheduledState) {
-                case DISABLED:
-                    group.disableInputPort(port);
-                    break;
-                case ENABLED:
-                    if (port.getScheduledState() == ScheduledState.DISABLED) {
-                        group.enableInputPort(port);
-                    } else if (port.getScheduledState() == 
ScheduledState.RUNNING) {
-                        group.stopInputPort(port);
-                    }
-                    break;
-                case RUNNING:
-                    if (port.getScheduledState() == ScheduledState.DISABLED) {
-                        group.enableInputPort(port);
-                    }
-                    if (port.getScheduledState() == ScheduledState.STOPPED) {
-                        context.getComponentScheduler().startComponent(port);
-                    }
-                    break;
-            }
-        } else if (port.getConnectableType() == ConnectableType.OUTPUT_PORT) {
-            switch (scheduledState) {
-                case DISABLED:
-                    group.disableOutputPort(port);
-                    break;
-                case ENABLED:
-                    if (port.getScheduledState() == ScheduledState.DISABLED) {
-                        group.enableOutputPort(port);
-                    } else if (port.getScheduledState() == 
ScheduledState.RUNNING) {
-                        group.stopOutputPort(port);
-                    }
-                    break;
-                case RUNNING:
-                    if (port.getScheduledState() == ScheduledState.DISABLED) {
-                        group.enableOutputPort(port);
-                    }
-                    if (port.getScheduledState() == ScheduledState.STOPPED) {
-                        context.getComponentScheduler().startComponent(port);
-                    }
-                    break;
-            }
-        }
+        context.getComponentScheduler().transitionComponentState(port, 
proposed.getScheduledState());
     }
 
     private Port addInputPort(final ProcessGroup destination, final 
VersionedPort proposed, final ComponentIdGenerator componentIdGenerator, final 
String temporaryName) {
@@ -1606,36 +1560,8 @@ public class StandardProcessGroupSynchronizer implements 
ProcessGroupSynchronize
                 
processor.setBackoffMechanism(BackoffMechanism.valueOf(proposed.getBackoffMechanism()));
             }
 
-            final ScheduledState procState = processor.getScheduledState();
-            final ProcessGroup group = processor.getProcessGroup();
-            switch (proposed.getScheduledState()) {
-                case DISABLED:
-                    if (procState == ScheduledState.RUNNING) {
-                        LOG.debug("Stopping {}", processor);
-                        group.stopProcessor(processor);
-                    }
-
-                    LOG.debug("Disabling {}", processor);
-                    group.disableProcessor(processor);
-                    break;
-                case ENABLED:
-                    if (procState == ScheduledState.DISABLED) {
-                        LOG.debug("Enabling {}", processor);
-                        group.enableProcessor(processor);
-                    } else if (procState == ScheduledState.RUNNING) {
-                        LOG.debug("Stopping {}", processor);
-                        group.stopProcessor(processor);
-                    }
-                    break;
-                case RUNNING:
-                    if (procState == ScheduledState.DISABLED) {
-                        LOG.debug("Enabling {}", processor);
-                        group.enableProcessor(processor);
-                    }
-                    LOG.debug("Starting {}", processor);
-                    context.getComponentScheduler().startComponent(processor);
-                    break;
-            }
+            // Transition state to disabled/enabled/running
+            
context.getComponentScheduler().transitionComponentState(processor, 
proposed.getScheduledState());
 
             if (!isEqual(processor.getBundleCoordinate(), 
proposed.getBundle())) {
                 final BundleCoordinate newBundleCoordinate = 
toCoordinate(proposed.getBundle());
@@ -1751,7 +1677,7 @@ public class StandardProcessGroupSynchronizer implements 
ProcessGroupSynchronize
             }
         } else {
             if (portState == ScheduledState.RUNNING) {
-                
remoteGroupPort.getRemoteProcessGroup().stopTransmitting(remoteGroupPort);
+                context.getComponentScheduler().stopComponent(remoteGroupPort);
             }
         }
     }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/AbstractComponentScheduler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/AbstractComponentScheduler.java
index 3add0ac472..d1f6cc1ac0 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/AbstractComponentScheduler.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/AbstractComponentScheduler.java
@@ -18,9 +18,17 @@
 package org.apache.nifi.groups;
 
 import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.flow.ScheduledState;
+import org.apache.nifi.registry.flow.mapping.VersionedComponentStateLookup;
+import org.apache.nifi.remote.RemoteGroupPort;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collection;
 import java.util.Queue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
@@ -28,8 +36,17 @@ import java.util.concurrent.atomic.AtomicLong;
 public abstract class AbstractComponentScheduler implements ComponentScheduler 
{
     private static final Logger logger = 
LoggerFactory.getLogger(AbstractComponentScheduler.class);
 
+    private final ControllerServiceProvider serviceProvider;
+    private final VersionedComponentStateLookup stateLookup;
+
     private final AtomicLong pauseCount = new AtomicLong(0L);
     private final Queue<Connectable> toStart = new LinkedBlockingQueue<>();
+    private final Queue<ControllerServiceNode> toEnable = new 
LinkedBlockingQueue<>();
+
+    public AbstractComponentScheduler(final ControllerServiceProvider 
controllerServiceProvider, final VersionedComponentStateLookup stateLookup) {
+        this.serviceProvider = controllerServiceProvider;
+        this.stateLookup = stateLookup;
+    }
 
     @Override
     public void pause() {
@@ -46,6 +63,9 @@ public abstract class AbstractComponentScheduler implements 
ComponentScheduler {
             return;
         }
 
+        logger.debug("{} enabling {}", this, toEnable);
+        enableNow(toEnable);
+
         Connectable connectable;
         while ((connectable = toStart.poll()) != null) {
             logger.debug("{} starting {}", this, connectable);
@@ -57,6 +77,105 @@ public abstract class AbstractComponentScheduler implements 
ComponentScheduler {
         return pauseCount.get() > 0;
     }
 
+
+    @Override
+    public void transitionComponentState(final Connectable component, final 
ScheduledState desiredState) {
+        final ScheduledState scheduledState = getScheduledState(component);
+        final ScheduledState finalState = desiredState == null ? 
ScheduledState.ENABLED : desiredState;
+
+        switch (finalState) {
+            case DISABLED:
+                if (scheduledState == ScheduledState.RUNNING) {
+                    logger.debug("Stopping {}", component);
+                    stopComponent(component);
+                }
+
+                logger.debug("Disabling {}", component);
+                disable(component);
+                break;
+            case ENABLED:
+                if (scheduledState == ScheduledState.DISABLED) {
+                    logger.debug("Enabling {}", component);
+                    enable(component);
+                } else if (scheduledState == ScheduledState.RUNNING) {
+                    logger.debug("Stopping {}", component);
+                    stopComponent(component);
+                }
+
+                break;
+            case RUNNING:
+                if (scheduledState == ScheduledState.DISABLED) {
+                    logger.debug("Enabling {}", component);
+                    enable(component);
+                }
+
+                logger.debug("Starting {}", component);
+                startComponent(component);
+                break;
+        }
+    }
+
+    private ScheduledState getScheduledState(final Connectable component) {
+        // Use the State Lookup to get the state, if possible. If, for some 
reason, it doesn't
+        // provide us a state (which should never happen) just fall back to 
the component's scheduled state.
+        switch (component.getConnectableType()) {
+            case INPUT_PORT:
+            case OUTPUT_PORT:
+            case REMOTE_INPUT_PORT:
+            case REMOTE_OUTPUT_PORT:
+                return stateLookup.getState((Port) component);
+            case PROCESSOR:
+                return stateLookup.getState((ProcessorNode) component);
+            case FUNNEL:
+                return ScheduledState.RUNNING;
+        }
+
+        switch (component.getScheduledState()) {
+            case DISABLED:
+                return ScheduledState.DISABLED;
+            case RUN_ONCE:
+            case STOPPED:
+            case STOPPING:
+                return ScheduledState.ENABLED;
+            case RUNNING:
+            case STARTING:
+            default:
+                return ScheduledState.RUNNING;
+        }
+    }
+
+
+    private void enable(final Connectable component) {
+        final ProcessGroup group = component.getProcessGroup();
+        switch (component.getConnectableType()) {
+            case INPUT_PORT:
+                group.enableInputPort((Port) component);
+                break;
+            case OUTPUT_PORT:
+                group.enableOutputPort((Port) component);
+                break;
+            case PROCESSOR:
+                group.enableProcessor((ProcessorNode) component);
+                break;
+        }
+    }
+
+    private void disable(final Connectable component) {
+        final ProcessGroup group = component.getProcessGroup();
+        switch (component.getConnectableType()) {
+            case INPUT_PORT:
+                group.disableInputPort((Port) component);
+                break;
+            case OUTPUT_PORT:
+                group.disableOutputPort((Port) component);
+                break;
+            case PROCESSOR:
+                group.disableProcessor((ProcessorNode) component);
+                break;
+        }
+    }
+
+
     @Override
     public void startComponent(final Connectable component) {
         if (isPaused()) {
@@ -68,5 +187,48 @@ public abstract class AbstractComponentScheduler implements 
ComponentScheduler {
         }
     }
 
+    @Override
+    public void stopComponent(final Connectable component) {
+        final ProcessGroup processGroup = component.getProcessGroup();
+        switch (component.getConnectableType()) {
+            case INPUT_PORT:
+                processGroup.stopInputPort((Port) component);
+                break;
+            case OUTPUT_PORT:
+                processGroup.stopOutputPort((Port) component);
+                break;
+            case PROCESSOR:
+                processGroup.stopProcessor((ProcessorNode) component);
+                break;
+            case REMOTE_INPUT_PORT:
+            case REMOTE_OUTPUT_PORT:
+                final RemoteGroupPort port = (RemoteGroupPort) component;
+                port.getRemoteProcessGroup().stopTransmitting(port);
+                break;
+        }
+    }
+
+    @Override
+    public void enableControllerServicesAsync(final 
Collection<ControllerServiceNode> controllerServices) {
+        if (isPaused()) {
+            logger.debug("{} called to enable {} but paused so will queue them 
for start later", this, controllerServices);
+            toEnable.addAll(controllerServices);
+        } else {
+            logger.debug("{} enabling {} now", this, controllerServices);
+            enableNow(controllerServices);
+        }
+    }
+
+    @Override
+    public void disableControllerServicesAsync(final 
Collection<ControllerServiceNode> controllerServices) {
+        serviceProvider.disableControllerServicesAsync(controllerServices);
+    }
+
+    protected ControllerServiceProvider getControllerServiceProvider() {
+        return serviceProvider;
+    }
+
     protected abstract void startNow(Connectable component);
+
+    protected abstract void enableNow(Collection<ControllerServiceNode> 
controllerServices);
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ComponentScheduler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ComponentScheduler.java
index bc6afcb979..6a38dde93d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ComponentScheduler.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ComponentScheduler.java
@@ -18,10 +18,22 @@
 package org.apache.nifi.groups;
 
 import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.flow.ScheduledState;
+
+import java.util.Collection;
 
 public interface ComponentScheduler {
     void startComponent(Connectable component);
 
+    void stopComponent(Connectable component);
+
+    void transitionComponentState(Connectable component, ScheduledState 
desiredState);
+
+    void enableControllerServicesAsync(Collection<ControllerServiceNode> 
controllerServices);
+
+    void disableControllerServicesAsync(Collection<ControllerServiceNode> 
controllerServices);
+
     void pause();
 
     void resume();
@@ -32,6 +44,22 @@ public interface ComponentScheduler {
         public void startComponent(final Connectable component) {
         }
 
+        @Override
+        public void stopComponent(final Connectable component) {
+        }
+
+        @Override
+        public void transitionComponentState(final Connectable component, 
final ScheduledState desiredState) {
+        }
+
+        @Override
+        public void enableControllerServicesAsync(final 
Collection<ControllerServiceNode> controllerServices) {
+        }
+
+        @Override
+        public void disableControllerServicesAsync(final 
Collection<ControllerServiceNode> controllerServices) {
+        }
+
         @Override
         public void pause() {
         }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/mapping/VersionedComponentStateLookup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/mapping/VersionedComponentStateLookup.java
index a1ceccd41f..3c855e7749 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/mapping/VersionedComponentStateLookup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/mapping/VersionedComponentStateLookup.java
@@ -21,7 +21,6 @@ import org.apache.nifi.connectable.Port;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.service.ControllerServiceNode;
-import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.flow.ScheduledState;
 
 public interface VersionedComponentStateLookup {
@@ -54,7 +53,7 @@ public interface VersionedComponentStateLookup {
 
         @Override
         public ScheduledState getState(final ControllerServiceNode 
serviceNode) {
-            return (serviceNode.getState() == ControllerServiceState.DISABLED 
|| serviceNode.getState() == ControllerServiceState.DISABLING) ? 
ScheduledState.DISABLED : ScheduledState.ENABLED;
+            return ScheduledState.DISABLED;
         }
     };
 
@@ -64,7 +63,7 @@ public interface VersionedComponentStateLookup {
     VersionedComponentStateLookup IDENTITY_LOOKUP = new 
VersionedComponentStateLookup() {
         @Override
         public ScheduledState getState(final ProcessorNode processorNode) {
-            return map(processorNode.getPhysicalScheduledState());
+            return map(processorNode.getDesiredState());
         }
 
         @Override
@@ -91,6 +90,10 @@ public interface VersionedComponentStateLookup {
         }
 
         private ScheduledState map(final 
org.apache.nifi.controller.ScheduledState componentState) {
+            if (componentState == null) {
+                return null;
+            }
+
             switch (componentState) {
                 case DISABLED:
                     return ScheduledState.DISABLED;
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 4c8af1d0db..ec5e438dc6 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -168,6 +168,7 @@ import 
org.apache.nifi.provenance.StandardProvenanceAuthorizableFactory;
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
 import org.apache.nifi.registry.VariableRegistry;
 import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.registry.flow.mapping.VersionedComponentStateLookup;
 import org.apache.nifi.registry.variable.MutableVariableRegistry;
 import org.apache.nifi.remote.HttpRemoteSiteListener;
 import org.apache.nifi.remote.RemoteGroupPort;
@@ -1416,28 +1417,7 @@ public class FlowController implements 
ReportingTaskProvider, Authorizable, Node
 
         readLock.lock();
         try {
-            final ScheduledStateLookup scheduledStateLookup = new 
ScheduledStateLookup() {
-                @Override
-                public ScheduledState getScheduledState(final ProcessorNode 
procNode) {
-                    if 
(startConnectablesAfterInitialization.contains(procNode)) {
-                        return ScheduledState.RUNNING;
-                    }
-
-                    return procNode.getDesiredState();
-                }
-
-                @Override
-                public ScheduledState getScheduledState(final Port port) {
-                    if (startConnectablesAfterInitialization.contains(port)) {
-                        return ScheduledState.RUNNING;
-                    }
-                    if 
(startRemoteGroupPortsAfterInitialization.contains(port)) {
-                        return ScheduledState.RUNNING;
-                    }
-
-                    return port.getScheduledState();
-                }
-            };
+            final ScheduledStateLookup scheduledStateLookup = 
createScheduledStateLookup();
 
             flowConfiguration = serializer.transform(this, 
scheduledStateLookup);
         } finally {
@@ -1447,11 +1427,76 @@ public class FlowController implements 
ReportingTaskProvider, Authorizable, Node
         serializer.serialize(flowConfiguration, os);
     }
 
+    public ScheduledStateLookup createScheduledStateLookup() {
+        return new ScheduledStateLookup() {
+            @Override
+            public ScheduledState getScheduledState(final ProcessorNode 
procNode) {
+                if (startConnectablesAfterInitialization.contains(procNode)) {
+                    return ScheduledState.RUNNING;
+                }
+
+                return procNode.getDesiredState();
+            }
+
+            @Override
+            public ScheduledState getScheduledState(final Port port) {
+                if (startConnectablesAfterInitialization.contains(port)) {
+                    return ScheduledState.RUNNING;
+                }
+                if (startRemoteGroupPortsAfterInitialization.contains(port)) {
+                    return ScheduledState.RUNNING;
+                }
+
+                return port.getScheduledState();
+            }
+        };
+    }
+
+    /**
+     * Creates a VersionedComponentStateLookup that checks whether or not the 
given component is scheduled to start when the FlowController
+     * is initialized. If the FlowController has already been initialized or 
if the given component is not scheduled to start upon FlowController
+     * initialization, delegates the call to the provided lookup
+     *
+     * @param delegate the lookup to delegate calls to if a component is not 
scheduled to start upon FlowController initialization
+     * @return the VersionedComponentStateLookup that is created
+     */
+    public VersionedComponentStateLookup 
createVersionedComponentStateLookup(final VersionedComponentStateLookup 
delegate) {
+        return new VersionedComponentStateLookup() {
+            @Override
+            public org.apache.nifi.flow.ScheduledState getState(final 
ProcessorNode processorNode) {
+                if (isStartAfterInitialization(processorNode)) {
+                    return org.apache.nifi.flow.ScheduledState.RUNNING;
+                }
+
+                return delegate.getState(processorNode);
+            }
+
+            @Override
+            public org.apache.nifi.flow.ScheduledState getState(final Port 
port) {
+                if (isStartAfterInitialization(port)) {
+                    return org.apache.nifi.flow.ScheduledState.RUNNING;
+                }
+
+                return delegate.getState(port);
+            }
+
+            @Override
+            public org.apache.nifi.flow.ScheduledState getState(final 
ReportingTaskNode taskNode) {
+                return delegate.getState(taskNode);
+            }
+
+            @Override
+            public org.apache.nifi.flow.ScheduledState getState(final 
ControllerServiceNode serviceNode) {
+                return delegate.getState(serviceNode);
+            }
+        };
+    }
+
     /**
      * Synchronizes this controller with the proposed flow.
      * <p>
      * For more details, see
-     * {@link FlowSynchronizer#sync(FlowController, DataFlow, 
PropertyEncryptor, FlowService)}.
+     * {@link FlowSynchronizer#sync(FlowController, DataFlow, 
PropertyEncryptor, FlowService, BundleUpdateStrategy)}.
      *
      * @param synchronizer synchronizer
      * @param dataFlow the flow to load the controller with. If the flow is 
null
@@ -1817,6 +1862,9 @@ public class FlowController implements 
ReportingTaskProvider, Authorizable, Node
                     case REMOTE_OUTPUT_PORT:
                         group.startOutputPort((Port) connectable);
                         break;
+                    case PROCESSOR:
+                        group.startProcessor((ProcessorNode) connectable,  
true);
+                        break;
                     default:
                         throw new IllegalArgumentException();
                 }
@@ -1847,6 +1895,10 @@ public class FlowController implements 
ReportingTaskProvider, Authorizable, Node
                     startConnectablesAfterInitialization.remove(connectable);
                     group.stopOutputPort((Port) connectable);
                     break;
+                case PROCESSOR:
+                    startConnectablesAfterInitialization.remove(connectable);
+                    group.stopProcessor((ProcessorNode) connectable);
+                    break;
                 default:
                     throw new IllegalArgumentException();
             }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
index 14f2799009..1dbf3b5b38 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
@@ -100,6 +100,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -310,7 +311,11 @@ public class VersionedFlowSynchronizer implements 
FlowSynchronizer {
 
                 final ComponentIdGenerator componentIdGenerator = (proposedId, 
instanceId, destinationGroupId) -> instanceId;
 
-                final ComponentScheduler componentScheduler = new 
FlowControllerComponentScheduler(controller);
+                // Use a Versioned Component State Lookup that will check to 
see if the component is scheduled to start upon FlowController initialization.
+                // Otherwise, fallback to the identity lookup (i.e., use 
whatever is set on the component itself).
+                final VersionedComponentStateLookup stateLookup = 
controller.createVersionedComponentStateLookup(VersionedComponentStateLookup.IDENTITY_LOOKUP);
+
+                final ComponentScheduler componentScheduler = new 
FlowControllerComponentScheduler(controller, stateLookup);
 
                 if (rootGroup.isEmpty()) {
                     final VersionedProcessGroup versionedRoot = 
versionedExternalFlow.getFlowContents();
@@ -343,7 +348,7 @@ public class VersionedFlowSynchronizer implements 
FlowSynchronizer {
                 final FlowMappingOptions flowMappingOptions = new 
FlowMappingOptions.Builder()
                     .mapSensitiveConfiguration(true)
                     .mapPropertyDescriptors(false)
-                    .stateLookup(VersionedComponentStateLookup.IDENTITY_LOOKUP)
+                    .stateLookup(stateLookup)
                     .sensitiveValueEncryptor(encryptor::encrypt)
                     .componentIdLookup(ComponentIdLookup.VERSIONED_OR_GENERATE)
                     .mapInstanceIdentifiers(true)
@@ -985,7 +990,8 @@ public class VersionedFlowSynchronizer implements 
FlowSynchronizer {
     private static class FlowControllerComponentScheduler extends 
AbstractComponentScheduler implements ComponentScheduler {
         private final FlowController flowController;
 
-        public FlowControllerComponentScheduler(final FlowController 
flowController) {
+        public FlowControllerComponentScheduler(final FlowController 
flowController, final VersionedComponentStateLookup stateLookup) {
+            super(flowController.getControllerServiceProvider(), stateLookup);
             this.flowController = flowController;
         }
 
@@ -1005,5 +1011,15 @@ public class VersionedFlowSynchronizer implements 
FlowSynchronizer {
                     break;
             }
         }
+
+        @Override
+        public void stopComponent(final Connectable component) {
+            flowController.stopConnectable(component);
+        }
+
+        @Override
+        protected void enableNow(final Collection<ControllerServiceNode> 
controllerServices) {
+            
flowController.getControllerServiceProvider().enableControllerServices(controllerServices);
+        }
     }
 }

Reply via email to