This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 1be32d6992 NIFI-14654 Fixed Stateless Process Group session lifecycle 
handling (#10014)
1be32d6992 is described below

commit 1be32d6992acf40bc63f69a57cf8b72eedc42cd7
Author: Mark Payne <[email protected]>
AuthorDate: Fri Jun 13 15:44:44 2025 -0400

    NIFI-14654 Fixed Stateless Process Group session lifecycle handling (#10014)
    
    Ensure when a Stateless Process Group is stopped that we properly call all 
success and/or failure session callbacks before stopping the Processor. In 
order to make system tests to verify this more straight-forward, also exposed 
the CounterRepository such that system tests can obtain counter values to 
ensure that methods are called appropriately.
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../nifi/controller/StandardProcessorNode.java     | 103 +++++----
 .../apache/nifi/groups/StandardProcessGroup.java   |  21 +-
 .../apache/nifi/controller/ProcessScheduler.java   |   6 +-
 .../org/apache/nifi/controller/ProcessorNode.java  |  17 +-
 .../nifi/controller/scheduling/LifecycleState.java |  17 +-
 .../java/org/apache/nifi/groups/ProcessGroup.java  |  13 +-
 .../lifecycle/ProcessorStopLifecycleMethods.java   |  61 ++++++
 .../flow/StandardStatelessGroupNodeFactory.java    |   7 +-
 .../scheduling/StandardProcessScheduler.java       |  29 ++-
 .../nifi/groups/StandardStatelessGroupNode.java    |   6 +-
 .../scheduling/TestStandardProcessScheduler.java   |   3 +-
 .../controller/service/mock/MockProcessGroup.java  |   3 +-
 .../nifi/stateless/flow/StatelessDataflow.java     |  17 +-
 .../scheduling/StatelessProcessScheduler.java      |  39 +++-
 .../nifi/stateless/engine/ExecutionProgress.java   |   7 +-
 .../engine/StandardExecutionProgress.java          |   9 +-
 .../stateless/engine/StandardStatelessEngine.java  |  45 ++--
 .../flow/StandardStatelessDataflowFactory.java     |   8 +-
 .../nifi/stateless/flow/StandardStatelessFlow.java | 234 +++++++++++++--------
 .../repository/RepositoryContextFactory.java       |   3 +
 .../StatelessRepositoryContextFactory.java         |   7 +-
 .../stateless/session/StatelessProcessSession.java |   6 +-
 .../apache/nifi/stateless/StatelessSystemIT.java   |   3 +-
 .../stateless/basics/ProcessorLifecycleIT.java     | 172 ++++++++++++++-
 .../tests/system/GenerateAndCountCallbacks.java    | 105 +++++++++
 .../apache/nifi/processors/tests/system/Sleep.java |  76 +++++--
 .../services/org.apache.nifi.processor.Processor   |   1 +
 .../tests/system/stateless/StatelessBasicsIT.java  |   2 +-
 28 files changed, 790 insertions(+), 230 deletions(-)

diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 1eb7a05dee..a1ac2a1205 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -56,6 +56,7 @@ import 
org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.controller.tasks.ActiveTask;
 import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.lifecycle.ProcessorStopLifecycleMethods;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.logging.LogLevel;
 import org.apache.nifi.logging.LogRepositoryFactory;
@@ -92,6 +93,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.scheduling.support.CronExpression;
 
+import java.lang.annotation.Annotation;
 import java.lang.management.ThreadInfo;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -939,7 +941,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
 
         // When getScheduledState() is called, we map the 'physical' state of 
STOPPING to STOPPED. This is done in order to maintain
         // backward compatibility because the UI and other clients will not 
know of the (relatively newer) 'STOPPING' state.
-        // Because of there previously was no STOPPING state, the way to 
determine of a processor had truly stopped was to check if its
+        // Because there previously was no STOPPING state, the way to 
determine of a processor had truly stopped was to check if its
         // Scheduled State was STOPPED AND it had no active threads.
         //
         // Also, we can have a situation in which a processor is started while 
invalid. Before the processor becomes valid, it can be stopped.
@@ -1800,18 +1802,24 @@ public class StandardProcessorNode extends 
ProcessorNode implements Connectable
      */
     @Override
     public CompletableFuture<Void> stop(final ProcessScheduler 
processScheduler, final ScheduledExecutorService executor, final ProcessContext 
processContext,
-            final SchedulingAgent schedulingAgent, final LifecycleState 
lifecycleState, final boolean triggerLifecycleMethods) {
+            final SchedulingAgent schedulingAgent, final LifecycleState 
lifecycleState, final ProcessorStopLifecycleMethods lifecycleMethods) {
 
         final Processor processor = processorRef.get().getProcessor();
         LOG.debug("Stopping processor: {}", this);
         setDesiredState(ScheduledState.STOPPED);
 
         final CompletableFuture<Void> future = new CompletableFuture<>();
-
         addStopFuture(future);
 
         // will ensure that the Processor represented by this node can only be 
stopped once
         if (this.scheduledState.compareAndSet(ScheduledState.RUNNING, 
ScheduledState.STOPPING) || 
this.scheduledState.compareAndSet(ScheduledState.RUN_ONCE, 
ScheduledState.STOPPING)) {
+            if (!lifecycleMethods.isTriggerOnUnscheduled() && 
!lifecycleMethods.isTriggerOnStopped()) {
+                // If we do not need to trigger either of the lifecycle 
methods, we can simply call the complete action and be done
+                completeStopAction();
+                future.complete(null);
+                return future;
+            }
+
             lifecycleState.incrementActiveThreadCount(null);
 
             // will continue to monitor active threads, invoking OnStopped 
once there are no
@@ -1823,17 +1831,11 @@ public class StandardProcessorNode extends 
ProcessorNode implements Connectable
                         if (lifecycleState.isScheduled()) {
                             
schedulingAgent.unschedule(StandardProcessorNode.this, lifecycleState);
 
-                            if (triggerLifecycleMethods) {
+                            if (lifecycleMethods.isTriggerOnUnscheduled()) {
                                 LOG.debug("Triggering @OnUnscheduled methods 
of {}", this);
-
-                                activateThread();
-                                try (final NarCloseable ignored = 
NarCloseable.withComponentNarLoader(getExtensionManager(), 
processor.getClass(), processor.getIdentifier())) {
-                                    
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, 
processor, processContext);
-                                } finally {
-                                    deactivateThread();
-                                }
+                                triggerOnUnscheduled(processContext);
                             } else {
-                                LOG.debug("Will not trigger @OnUnscheduled 
methods of {} because triggerLifecycleState = false", this);
+                                LOG.debug("Will not trigger @OnUnscheduled 
methods of {} because ProcessorStopLifecycleMethods.isTriggerOnUnscheduled() = 
false", this);
                             }
                         }
 
@@ -1841,38 +1843,35 @@ public class StandardProcessorNode extends 
ProcessorNode implements Connectable
                         // performing the lifecycle actions counts as 1 thread.
                         final boolean allThreadsComplete = 
lifecycleState.getActiveThreadCount() == 1;
                         if (allThreadsComplete) {
-                            if (triggerLifecycleMethods) {
-                                LOG.debug("Triggering @OnStopped methods of 
{}", this);
-
-                                activateThread();
-                                try (final NarCloseable ignored = 
NarCloseable.withComponentNarLoader(getExtensionManager(), 
processor.getClass(), processor.getIdentifier())) {
-                                    
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, 
processContext);
-                                } finally {
-                                    deactivateThread();
-                                }
-                            } else {
-                                LOG.debug("Will not trigger @OnStopped methods 
of {} because triggerLifecycleState = false", this);
-                            }
+                            try {
+                                if (lifecycleMethods.isTriggerOnStopped()) {
+                                    LOG.debug("Triggering @OnStopped methods 
of {}", this);
 
-                            lifecycleState.decrementActiveThreadCount();
-                            completeStopAction();
-
-                            // This can happen only when we join a cluster. In 
such a case, we can inherit a flow from the cluster that says that
-                            // the Processor is to be running. However, if the 
Processor is already in the process of stopping, we cannot immediately
-                            // start running the Processor. As a result, we 
check here, since the Processor is stopped, and then immediately start the
-                            // Processor if need be.
-                            final ScheduledState desired = 
StandardProcessorNode.this.desiredState;
-                            if (desired == ScheduledState.RUNNING) {
-                                LOG.info("Finished stopping {} but desired 
state is now RUNNING so will start processor", this);
-                                
getProcessGroup().startProcessor(StandardProcessorNode.this, true);
-                            } else if (desired == ScheduledState.DISABLED) {
-                                final boolean updated = 
scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.DISABLED);
-
-                                if (updated) {
-                                    LOG.info("Finished stopping {} but desired 
state is now DISABLED so disabled processor", this);
+                                    triggerOnStopped(processContext);
                                 } else {
-                                    LOG.info("Finished stopping {} but desired 
state is now DISABLED. Scheduled State could not be transitioned from STOPPED 
to DISABLED, "
-                                        + "though, so will allow the other 
thread to finish state transition. Current state is {}", this, 
scheduledState.get());
+                                    LOG.debug("Will not trigger @OnStopped 
methods of {} because ProcessorStopLifecycleMethods.isTriggerOnStopped() = 
false", this);
+                                }
+                            } finally {
+                                lifecycleState.decrementActiveThreadCount();
+                                completeStopAction();
+
+                                // This can happen only when we join a 
cluster. In such a case, we can inherit a flow from the cluster that says that
+                                // the Processor is to be running. However, if 
the Processor is already in the process of stopping, we cannot immediately
+                                // start running the Processor. As a result, 
we check here, since the Processor is stopped, and then immediately start the
+                                // Processor if need be.
+                                final ScheduledState desired = 
StandardProcessorNode.this.desiredState;
+                                if (desired == ScheduledState.RUNNING) {
+                                    LOG.info("Finished stopping {} but desired 
state is now RUNNING so will start processor", this);
+                                    
getProcessGroup().startProcessor(StandardProcessorNode.this, true);
+                                } else if (desired == ScheduledState.DISABLED) 
{
+                                    final boolean updated = 
scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.DISABLED);
+
+                                    if (updated) {
+                                        LOG.info("Finished stopping {} but 
desired state is now DISABLED so disabled processor", this);
+                                    } else {
+                                        LOG.info("Finished stopping {} but 
desired state is now DISABLED. Scheduled State could not be transitioned from 
STOPPED to DISABLED, "
+                                                 + "though, so will allow the 
other thread to finish state transition. Current state is {}", this, 
scheduledState.get());
+                                    }
                                 }
                             }
                         } else {
@@ -1900,6 +1899,26 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
         return future;
     }
 
+    @Override
+    public void triggerOnUnscheduled(final ProcessContext processContext) {
+        triggerLifecycleMethod(processContext, OnUnscheduled.class);
+    }
+
+    private void triggerOnStopped(final ProcessContext processContext) {
+        triggerLifecycleMethod(processContext, OnStopped.class);
+    }
+
+    private void triggerLifecycleMethod(final ProcessContext processContext, 
final Class<? extends Annotation> lifecycleAnnotation) {
+        final Processor processor = processorRef.get().getProcessor();
+
+        activateThread();
+        try (final NarCloseable ignored = 
NarCloseable.withComponentNarLoader(getExtensionManager(), 
processor.getClass(), processor.getIdentifier())) {
+            
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(lifecycleAnnotation, 
processor, processContext);
+        } finally {
+            deactivateThread();
+        }
+    }
+
     /**
      * Marks the processor as fully stopped, and completes any futures that 
are to be completed as a result
      */
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index b49d52d113..426b30c2cf 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -69,6 +69,7 @@ import org.apache.nifi.flow.VersionedExternalFlow;
 import org.apache.nifi.flow.VersionedProcessGroup;
 import 
org.apache.nifi.flow.synchronization.StandardVersionedComponentSynchronizer;
 import 
org.apache.nifi.flow.synchronization.VersionedFlowSynchronizationContext;
+import org.apache.nifi.lifecycle.ProcessorStopLifecycleMethods;
 import org.apache.nifi.logging.LogRepository;
 import org.apache.nifi.logging.LogRepositoryFactory;
 import org.apache.nifi.nar.ExtensionManager;
@@ -588,14 +589,15 @@ public final class StandardProcessGroup implements 
ProcessGroup {
     }
 
     @Override
-    public CompletableFuture<Void> stopComponents() {
+    public CompletableFuture<Void> stopComponents(final 
ProcessorStopLifecycleMethods processorStopLifecycleMethods) {
         readLock.lock();
         try {
             final List<CompletableFuture<Void>> futures = new ArrayList<>();
 
             
getProcessors().stream().filter(STOP_PROCESSORS_FILTER).forEach(node -> {
                 try {
-                    futures.add(node.getProcessGroup().stopProcessor(node));
+                    final StandardProcessGroup immediateGroup = 
(StandardProcessGroup) node.getProcessGroup();
+                    futures.add(immediateGroup.stopProcessor(node, 
processorStopLifecycleMethods));
                 } catch (final Throwable t) {
                     LOG.error("Unable to stop processor {}", 
node.getIdentifier(), t);
                 }
@@ -1752,7 +1754,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
             processor.reloadAdditionalResourcesIfNecessary();
 
             return scheduler.runProcessorOnce(processor, stopCallback);
-        } catch (Exception e) {
+        } catch (final Exception e) {
             processor.getLogger().error("Error while running processor {} 
once.", processor, e);
             return stopProcessor(processor);
         } finally {
@@ -1826,6 +1828,17 @@ public final class StandardProcessGroup implements 
ProcessGroup {
 
     @Override
     public CompletableFuture<Void> stopProcessor(final ProcessorNode 
processor) {
+        // When using Stateless Engine, we do not want to trigger lifecycle 
methods because the Stateless engine will create N
+        // ProcessorNode's, one for each Concurrent Task and use those. 
Therefore, we do not want to trigger any lifecycle events
+        // on this ProcessorNode object, but we do need to call 
stopProcessor() to ensure that we keep appropriate state
+        // about whether the Processor is scheduled, etc.
+        final ProcessorStopLifecycleMethods lifecycleMethods = 
resolveExecutionEngine() == ExecutionEngine.STATELESS
+            ? ProcessorStopLifecycleMethods.TRIGGER_NONE : 
ProcessorStopLifecycleMethods.TRIGGER_ALL;
+
+        return stopProcessor(processor, lifecycleMethods);
+    }
+
+    private CompletableFuture<Void> stopProcessor(final ProcessorNode 
processor, final ProcessorStopLifecycleMethods lifecycleMethods) {
         readLock.lock();
         try {
             if (!processors.containsKey(processor.getIdentifier())) {
@@ -1837,7 +1850,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
                 throw new IllegalStateException("Processor is disabled");
             }
 
-            return scheduler.stopProcessor(processor);
+            return scheduler.stopProcessor(processor, lifecycleMethods);
         } finally {
             readLock.unlock();
         }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
index 416b774ee5..2df3cb477a 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
@@ -22,6 +22,7 @@ import org.apache.nifi.connectable.Port;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.groups.StatelessGroupNode;
+import org.apache.nifi.lifecycle.ProcessorStopLifecycleMethods;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
@@ -80,9 +81,10 @@ public interface ProcessScheduler {
      * interrupt any threads that are currently running within the given
      * Processor. If the Processor is not scheduled to run, does nothing.
      *
-     * @param procNode to stop
+     * @param procNode the Processor to stop
+     * @param lifecycleMethods the lifecycle methods to invoke when stopping 
the processor
      */
-    CompletableFuture<Void> stopProcessor(ProcessorNode procNode);
+    CompletableFuture<Void> stopProcessor(ProcessorNode procNode, 
ProcessorStopLifecycleMethods lifecycleMethods);
 
     /**
      * Interrupts all threads that are currently active in the Processor in an 
attempt to
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
index be53eabf81..7286ebd964 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.controller;
 
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 import org.apache.nifi.annotation.notification.PrimaryNodeState;
 import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.validation.ValidationStatus;
@@ -25,6 +26,7 @@ import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.controller.scheduling.LifecycleState;
 import org.apache.nifi.controller.scheduling.SchedulingAgent;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.lifecycle.ProcessorStopLifecycleMethods;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.logging.LogLevel;
 import org.apache.nifi.migration.ControllerServiceFactory;
@@ -240,11 +242,20 @@ public abstract class ProcessorNode extends 
AbstractComponentNode implements Con
      * @param scheduleState
      *            the ScheduleState that can be used to ensure that the 
running state (STOPPED, RUNNING, etc.)
      *            as well as the active thread counts are kept in sync
-     * @param triggerLifecycleMethods
-     *            whether or not to trigger lifecycle methods such as 
@OnScheduled, @OnStopped, etc.
+     * @param lifecycleMethods
+     *            Indicates which lifecycle methods should be triggered to run
      */
     public abstract CompletableFuture<Void> stop(ProcessScheduler 
processScheduler, ScheduledExecutorService executor,
-        ProcessContext processContext, SchedulingAgent schedulingAgent, 
LifecycleState scheduleState, boolean triggerLifecycleMethods);
+        ProcessContext processContext, SchedulingAgent schedulingAgent, 
LifecycleState scheduleState, ProcessorStopLifecycleMethods lifecycleMethods);
+
+    /**
+     * Triggers any method with the {@link OnUnscheduled} annotation on the 
Processor. This does not stop scheduling the Processor, change any state
+     * of the ProcessorNode, or invoke any other lifecycle methods. This 
method should be used only in order to signal to a Processor that it may
+     * perform any necessary duties that are associated with no longer being 
scheduled, such as proactively ending an onTrigger cycle.
+     *
+     * @param processContext the ProcessContext that may be provided to the 
methods annotated with {@link OnUnscheduled}.
+     */
+    public abstract void triggerOnUnscheduled(ProcessContext processContext);
 
     /**
      * Marks all active tasks as terminated and interrupts all active threads
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/scheduling/LifecycleState.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/scheduling/LifecycleState.java
index fa7312eac3..a3095a86ea 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/scheduling/LifecycleState.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/scheduling/LifecycleState.java
@@ -19,6 +19,8 @@ package org.apache.nifi.controller.scheduling;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.controller.repository.ActiveProcessSessionFactory;
 import org.apache.nifi.processor.exception.TerminatedTaskException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -31,6 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class LifecycleState {
+    private static final Logger logger = 
LoggerFactory.getLogger(LifecycleState.class);
 
     private final Object componentId;
     private final AtomicInteger activeThreadCount = new AtomicInteger(0);
@@ -74,7 +77,12 @@ public class LifecycleState {
             activeProcessSessionFactories.put(sessionFactory, null);
         }
 
-        return activeThreadCount.incrementAndGet();
+        final int updatedThreadCount = activeThreadCount.incrementAndGet();
+        if (logger.isDebugEnabled()) {
+            logger.debug("Active Thread Count for {} incremented to {}", 
componentId, updatedThreadCount, new RuntimeException("Stack Trace for 
Debugging"));
+        }
+
+        return updatedThreadCount;
     }
 
     public synchronized int decrementActiveThreadCount() {
@@ -82,7 +90,12 @@ public class LifecycleState {
             return activeThreadCount.get();
         }
 
-        return activeThreadCount.decrementAndGet();
+        final int updatedThreadCount = activeThreadCount.decrementAndGet();
+        if (logger.isDebugEnabled()) {
+            logger.debug("Active Thread Count for {} decremented to {}", 
componentId, updatedThreadCount, new RuntimeException("Stack Trace for 
Debugging"));
+        }
+
+        return updatedThreadCount;
     }
 
     public int getActiveThreadCount() {
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
index 4a53e453f2..1dd9544985 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
@@ -35,6 +35,7 @@ import 
org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.flow.ExecutionEngine;
 import org.apache.nifi.flow.VersionedExternalFlow;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.lifecycle.ProcessorStopLifecycleMethods;
 import org.apache.nifi.parameter.ParameterContext;
 import org.apache.nifi.parameter.ParameterUpdate;
 import org.apache.nifi.processor.Processor;
@@ -177,7 +178,17 @@ public interface ProcessGroup extends 
ComponentAuthorizable, Positionable, Versi
      * Stops all Processors, Local Ports, and Funnels that are directly within
      * this group and child ProcessGroups, except for those that are disabled.
      */
-    CompletableFuture<Void> stopComponents();
+    default CompletableFuture<Void> stopComponents() {
+        return stopComponents(ProcessorStopLifecycleMethods.TRIGGER_ALL);
+    }
+
+    /**
+     * Stops all Processors, Local Ports, and Funnels that are directly within
+     * this group and child ProcessGroups, except for those that are disabled.
+     *
+     * @param processorStopLifecycleMethods indicates which lifecycle methods 
should be called when stopping Processors
+     */
+    CompletableFuture<Void> stopComponents(ProcessorStopLifecycleMethods 
processorStopLifecycleMethods);
 
     /**
      * @return the scheduled state for this ProcessGroup, or 
StatelessGroupScheduledState.STOPPED
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/lifecycle/ProcessorStopLifecycleMethods.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/lifecycle/ProcessorStopLifecycleMethods.java
new file mode 100644
index 0000000000..8c9081e17f
--- /dev/null
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/lifecycle/ProcessorStopLifecycleMethods.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.lifecycle;
+
+public interface ProcessorStopLifecycleMethods {
+
+    boolean isTriggerOnUnscheduled();
+
+    boolean isTriggerOnStopped();
+
+    ProcessorStopLifecycleMethods TRIGGER_ALL = new 
ProcessorStopLifecycleMethods() {
+        @Override
+        public boolean isTriggerOnUnscheduled() {
+            return true;
+        }
+
+        @Override
+        public boolean isTriggerOnStopped() {
+            return true;
+        }
+    };
+
+    ProcessorStopLifecycleMethods TRIGGER_NONE = new 
ProcessorStopLifecycleMethods() {
+        @Override
+        public boolean isTriggerOnUnscheduled() {
+            return false;
+        }
+
+        @Override
+        public boolean isTriggerOnStopped() {
+            return false;
+        }
+    };
+
+    ProcessorStopLifecycleMethods TRIGGER_ONSTOPPED = new 
ProcessorStopLifecycleMethods() {
+        @Override
+        public boolean isTriggerOnUnscheduled() {
+            return false;
+        }
+
+        @Override
+        public boolean isTriggerOnStopped() {
+            return true;
+        }
+    };
+}
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardStatelessGroupNodeFactory.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardStatelessGroupNodeFactory.java
index 1d9aae1344..3658b74763 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardStatelessGroupNodeFactory.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardStatelessGroupNodeFactory.java
@@ -70,7 +70,6 @@ import org.apache.nifi.stateless.engine.StatelessFlowManager;
 import org.apache.nifi.stateless.engine.StatelessProcessContextFactory;
 import org.apache.nifi.stateless.repository.RepositoryContextFactory;
 import org.apache.nifi.stateless.repository.StatelessFlowFileRepository;
-import org.apache.nifi.stateless.repository.StatelessProvenanceRepository;
 import org.apache.nifi.stateless.repository.StatelessRepositoryContextFactory;
 
 import java.time.Duration;
@@ -119,7 +118,6 @@ public class StandardStatelessGroupNodeFactory implements 
StatelessGroupNodeFact
         final FlowFileRepository underlyingFlowFileRepository = 
flowController.getRepositoryContextFactory().getFlowFileRepository();
         final StatelessFlowFileRepository flowFileRepository = new 
StatelessBridgeFlowFileRepository(underlyingFlowFileRepository, 
resourceClaimManager);
 
-        final StatelessProvenanceRepository statelessProvenanceRepository = 
new StatelessProvenanceRepository(1_000);
         flowFileRepository.initialize(resourceClaimManager);
 
         final ContentRepository contentRepository = new 
NonPurgeableContentRepository(flowController.getRepositoryContextFactory().getContentRepository());
@@ -128,7 +126,6 @@ public class StandardStatelessGroupNodeFactory implements 
StatelessGroupNodeFact
             flowFileRepository,
             flowController.getFlowFileEventRepository(),
             flowController.getCounterRepository(),
-            statelessProvenanceRepository,
             flowController.getStateManagerProvider());
 
         final FlowMappingOptions flowMappingOptions = new 
FlowMappingOptions.Builder()
@@ -226,7 +223,8 @@ public class StandardStatelessGroupNodeFactory implements 
StatelessGroupNodeFact
             }
         };
 
-        final StatelessProcessScheduler statelessScheduler = new 
StatelessProcessScheduler(flowController.getExtensionManager(), Duration.of(10, 
ChronoUnit.SECONDS));
+        final StatelessProcessScheduler statelessScheduler = new 
StatelessProcessScheduler(flowController.getExtensionManager(),
+            flowController.getLifecycleStateManager(), Duration.of(10, 
ChronoUnit.SECONDS));
 
         final StatelessStateManagerProvider stateManagerProvider = new 
StatelessStateManagerProvider();
         final StatelessEngine statelessEngine = new 
StandardStatelessEngine.Builder()
@@ -242,6 +240,7 @@ public class StandardStatelessGroupNodeFactory implements 
StatelessGroupNodeFact
             .stateManagerProvider(stateManagerProvider)
             .kerberosConfiguration(kerberosConfig)
             .statusTaskInterval(null)
+            .lifecycleStateManager(flowController.getLifecycleStateManager())
             .build();
 
         final BulletinRepository bulletinRepository = 
flowController.getBulletinRepository();
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 6401ae0375..1a7a7dbb57 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -45,6 +45,7 @@ import 
org.apache.nifi.controller.service.StandardConfigurationContext;
 import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.flow.ExecutionEngine;
 import org.apache.nifi.groups.StatelessGroupNode;
+import org.apache.nifi.lifecycle.ProcessorStopLifecycleMethods;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.logging.StandardLoggingContext;
 import org.apache.nifi.nar.NarCloseable;
@@ -544,21 +545,20 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
 
     /**
      * Stops the given {@link Processor} by invoking its
-     * {@link ProcessorNode#stop(ProcessScheduler, ScheduledExecutorService, 
ProcessContext, SchedulingAgent, LifecycleState, boolean)}
+     * {@link ProcessorNode#stop(ProcessScheduler, ScheduledExecutorService, 
ProcessContext, SchedulingAgent, LifecycleState, ProcessorStopLifecycleMethods)}
      * method.
      *
-     * @see StandardProcessorNode#stop(ProcessScheduler, 
ScheduledExecutorService, ProcessContext, SchedulingAgent, LifecycleState, 
boolean)
+     * @see StandardProcessorNode#stop(ProcessScheduler, 
ScheduledExecutorService, ProcessContext, SchedulingAgent, LifecycleState, 
ProcessorStopLifecycleMethods)
      */
     @Override
-    public synchronized CompletableFuture<Void> stopProcessor(final 
ProcessorNode procNode) {
+    public synchronized CompletableFuture<Void> stopProcessor(final 
ProcessorNode procNode, final ProcessorStopLifecycleMethods lifecycleMethods) {
         final LifecycleState lifecycleState = getLifecycleState(procNode, 
false, false);
 
-        StandardProcessContext processContext = new 
StandardProcessContext(procNode, getControllerServiceProvider(),
+        final StandardProcessContext processContext = new 
StandardProcessContext(procNode, getControllerServiceProvider(),
             getStateManager(procNode.getIdentifier()), 
lifecycleState::isTerminated, flowController);
 
-        final boolean triggerLifecycleMethods = 
procNode.getProcessGroup().resolveExecutionEngine() != 
ExecutionEngine.STATELESS;
         LOG.info("Stopping {}", procNode);
-        return procNode.stop(this, this.componentLifeCycleThreadPool, 
processContext, getSchedulingAgent(procNode), lifecycleState, 
triggerLifecycleMethods);
+        return procNode.stop(this, this.componentLifeCycleThreadPool, 
processContext, getSchedulingAgent(procNode), lifecycleState, lifecycleMethods);
     }
 
     @Override
@@ -844,8 +844,23 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
     }
 
     private CompletableFuture<Void> enableControllerService(final 
ControllerServiceNode service, final boolean completeFutureExceptionally) {
+        if (service.isActive()) {
+            LOG.debug("{} is already active, so not enabling it again", 
service);
+            return CompletableFuture.completedFuture(null);
+        }
+
         LOG.info("Enabling {}", service);
-        return service.enable(this.componentLifeCycleThreadPool, 
this.administrativeYieldMillis, completeFutureExceptionally);
+
+        final List<CompletableFuture<Void>> futures = new ArrayList<>();
+        final List<ControllerServiceNode> dependentServices = 
service.getRequiredControllerServices();
+        for (final ControllerServiceNode dependentService : dependentServices) 
{
+            // Enable Controller Service but if it fails, do not complete the 
future Exceptionally. This allows us to wait up until the
+            // timeout for the service to enable, even if it needs to retry in 
order to do so.
+            futures.add(enableControllerService(dependentService, 
completeFutureExceptionally));
+        }
+
+        futures.add(service.enable(this.componentLifeCycleThreadPool, 
this.administrativeYieldMillis, completeFutureExceptionally));
+        return CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[0]));
     }
 
     @Override
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardStatelessGroupNode.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardStatelessGroupNode.java
index 9ef1fe9103..6f0f96723d 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardStatelessGroupNode.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardStatelessGroupNode.java
@@ -43,6 +43,7 @@ import 
org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.controller.tasks.StatelessFlowTask;
 import org.apache.nifi.flow.VersionedExternalFlow;
+import org.apache.nifi.lifecycle.ProcessorStopLifecycleMethods;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.processor.ProcessContext;
@@ -329,7 +330,7 @@ public class StandardStatelessGroupNode implements 
StatelessGroupNode {
             .transactionThresholds(TransactionThresholds.UNLIMITED)
             .build();
 
-        final ProcessScheduler processScheduler = new 
StatelessProcessScheduler(extensionManager, Duration.of(10, 
ChronoUnit.SECONDS)) {
+        final ProcessScheduler processScheduler = new 
StatelessProcessScheduler(extensionManager, lifecycleStateManager, 
Duration.of(10, ChronoUnit.SECONDS)) {
             @Override
             public void yield(final ProcessorNode procNode) {
                 if (isSourceProcessor(procNode)) {
@@ -461,7 +462,7 @@ public class StandardStatelessGroupNode implements 
StatelessGroupNode {
         final List<CompletableFuture<Void>> futures = new ArrayList<>();
 
         for (final ProcessorNode procNode : 
getProcessGroup().findAllProcessors()) {
-            final CompletableFuture<Void> stopProcessorFuture = 
scheduler.stopProcessor(procNode);
+            final CompletableFuture<Void> stopProcessorFuture = 
scheduler.stopProcessor(procNode, ProcessorStopLifecycleMethods.TRIGGER_NONE);
             futures.add(stopProcessorFuture);
         }
 
@@ -482,7 +483,6 @@ public class StandardStatelessGroupNode implements 
StatelessGroupNode {
     }
 
 
-
     private void stopPorts() {
         final List<Port> allPorts = new ArrayList<>();
         allPorts.addAll(getProcessGroup().findAllInputPorts());
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
index 250dd5d67d..bcbf5b375a 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
@@ -52,6 +52,7 @@ import 
org.apache.nifi.controller.service.StandardControllerServiceProvider;
 import org.apache.nifi.controller.service.mock.MockProcessGroup;
 import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.lifecycle.ProcessorStopLifecycleMethods;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.nar.ExtensionDiscoveringManager;
 import org.apache.nifi.nar.StandardExtensionDiscoveringManager;
@@ -272,7 +273,7 @@ public class TestStandardProcessScheduler {
 
         Thread.sleep(25L);
 
-        scheduler.stopProcessor(procNode);
+        scheduler.stopProcessor(procNode, 
ProcessorStopLifecycleMethods.TRIGGER_ALL);
         assertTrue(service.isActive());
         assertSame(ControllerServiceState.ENABLING, service.getState());
         scheduler.disableControllerService(service).get();
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
index f96964bf44..9000e35aa3 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
@@ -48,6 +48,7 @@ import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.groups.StatelessGroupNode;
 import org.apache.nifi.groups.StatelessGroupScheduledState;
 import org.apache.nifi.groups.VersionedComponentAdditions;
+import org.apache.nifi.lifecycle.ProcessorStopLifecycleMethods;
 import org.apache.nifi.parameter.ParameterContext;
 import org.apache.nifi.parameter.ParameterUpdate;
 import org.apache.nifi.registry.flow.FlowLocation;
@@ -163,7 +164,7 @@ public class MockProcessGroup implements ProcessGroup {
     }
 
     @Override
-    public CompletableFuture<Void> stopComponents() {
+    public CompletableFuture<Void> stopComponents(final 
ProcessorStopLifecycleMethods processorStopLifecycleMethods) {
         return CompletableFuture.completedFuture(null);
     }
 
diff --git 
a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
 
b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
index dc4a26dca5..9ae5efc882 100644
--- 
a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
+++ 
b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
@@ -24,7 +24,9 @@ import org.apache.nifi.reporting.BulletinRepository;
 import java.io.InputStream;
 import java.time.Duration;
 import java.util.Map;
+import java.util.OptionalLong;
 import java.util.Set;
+import java.util.regex.Pattern;
 
 public interface StatelessDataflow {
     /**
@@ -92,22 +94,13 @@ public interface StatelessDataflow {
 
     boolean isFlowFileQueued();
 
-    /**
-     *
-     * @return True if there are any processors in the dataflow with the 
{@link org.apache.nifi.annotation.behavior.Stateful} annotation
-     */
-    boolean isStateful();
-
     void purge();
 
     Map<String, String> getComponentStates(Scope scope);
 
-    void setComponentStates(Map<String, String> componentStates, Scope scope);
-
-    boolean isSourcePrimaryNodeOnly();
-
-    long getSourceYieldExpiration();
-
     BulletinRepository getBulletinRepository();
 
+    OptionalLong getCounter(String componentId, String counterName);
+
+    Map<String, Long> getCounters(Pattern counterNamePattern);
 }
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
index 9ec55ddfb5..555c1373ea 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
@@ -21,19 +21,20 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnShutdown;
 import org.apache.nifi.annotation.notification.PrimaryNodeState;
 import org.apache.nifi.components.validation.ValidationStatus;
+import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.Funnel;
 import org.apache.nifi.connectable.Port;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ReportingTaskNode;
-import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.SchedulingAgentCallback;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.controller.service.StandardConfigurationContext;
 import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.groups.StatelessGroupNode;
+import org.apache.nifi.lifecycle.ProcessorStopLifecycleMethods;
 import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.processor.ProcessContext;
@@ -48,6 +49,7 @@ import org.slf4j.LoggerFactory;
 import java.lang.reflect.InvocationTargetException;
 import java.time.Duration;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
@@ -65,6 +67,7 @@ public class StatelessProcessScheduler implements 
ProcessScheduler {
 
     private final SchedulingAgent schedulingAgent;
     private final ExtensionManager extensionManager;
+    private final LifecycleStateManager lifecycleStateManager;
 
     private FlowEngine componentLifeCycleThreadPool;
     private ScheduledExecutorService componentMonitoringThreadPool;
@@ -74,8 +77,9 @@ public class StatelessProcessScheduler implements 
ProcessScheduler {
 
     private final long processorStartTimeoutMillis;
 
-    public StatelessProcessScheduler(final ExtensionManager extensionManager, 
final Duration processorStartTimeout) {
+    public StatelessProcessScheduler(final ExtensionManager extensionManager, 
final LifecycleStateManager lifecycleStateManager, final Duration 
processorStartTimeout) {
         this.extensionManager = extensionManager;
+        this.lifecycleStateManager = lifecycleStateManager;
         this.processorStartTimeoutMillis = processorStartTimeout.toMillis();
         schedulingAgent = new StatelessSchedulingAgent(extensionManager);
     }
@@ -155,17 +159,31 @@ public class StatelessProcessScheduler implements 
ProcessScheduler {
     }
 
     @Override
-    public CompletableFuture<Void> stopProcessor(final ProcessorNode procNode) 
{
+    public CompletableFuture<Void> stopProcessor(final ProcessorNode procNode, 
final ProcessorStopLifecycleMethods lifecycleMethods) {
         logger.info("Stopping {}", procNode);
         final ProcessContext processContext = 
processContextFactory.createProcessContext(procNode);
-        final LifecycleState lifecycleState = new 
LifecycleState(procNode.getIdentifier());
-        final boolean scheduled = procNode.getScheduledState() == 
ScheduledState.RUNNING || procNode.getActiveThreadCount() > 0;
-        lifecycleState.setScheduled(scheduled);
-        return procNode.stop(this, this.componentLifeCycleThreadPool, 
processContext, schedulingAgent, lifecycleState, true);
+        final LifecycleState lifecycleState = 
lifecycleStateManager.getOrRegisterLifecycleState(procNode.getIdentifier(), 
false, false);
+        return procNode.stop(this, this.componentLifeCycleThreadPool, 
processContext, schedulingAgent, lifecycleState, lifecycleMethods);
     }
 
     @Override
     public void terminateProcessor(final ProcessorNode procNode) {
+        final Optional<LifecycleState> optionalLifecycleState = 
lifecycleStateManager.getLifecycleState(procNode.getIdentifier());
+        if (optionalLifecycleState.isEmpty()) {
+            logger.debug("No Lifecycle State registered for {} so will not 
terminate", procNode);
+            return;
+        }
+
+        final LifecycleState lifecycleState = optionalLifecycleState.get();
+        final int activeThreadCount = lifecycleState.getActiveThreadCount();
+        if (activeThreadCount == 0) {
+            logger.debug("No active threads registered for {} so will not 
terminate", procNode);
+            return;
+        }
+
+        lifecycleState.terminate();
+        final int terminationCounts = procNode.terminate();
+        logger.info("Terminated {} with {} active threads", procNode, 
terminationCounts);
     }
 
     @Override
@@ -243,7 +261,12 @@ public class StatelessProcessScheduler implements 
ProcessScheduler {
 
     @Override
     public int getActiveThreadCount(final Object scheduled) {
-        return 0;
+        if (!(scheduled instanceof final Connectable connectable)) {
+            return 0;
+        }
+
+        final Optional<LifecycleState> optionalLifecycleState = 
lifecycleStateManager.getLifecycleState(connectable.getIdentifier());
+        return 
optionalLifecycleState.map(LifecycleState::getActiveThreadCount).orElse(0);
     }
 
     @Override
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/ExecutionProgress.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/ExecutionProgress.java
index 21dde30ec9..748815512c 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/ExecutionProgress.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/ExecutionProgress.java
@@ -64,12 +64,7 @@ public interface ExecutionProgress {
      */
     void notifyExecutionFailed(Throwable cause);
 
-    /**
-     * Indicates whether or not the port with the given name is considered a 
Failure Port
-     * @param portName the name of the port
-     * @return <code>true</code> if the port is a failure port, 
<code>false</code> otherwise
-     */
-    boolean isFailurePort(String portName);
+    boolean isFailurePort(Connectable connectable);
 
     boolean isTerminalPort(Connectable connectable);
 
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardExecutionProgress.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardExecutionProgress.java
index 24fa2ca4b5..247c4efdc1 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardExecutionProgress.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardExecutionProgress.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.stateless.engine;
 
+import org.apache.nifi.components.PortFunction;
 import org.apache.nifi.components.state.StatelessStateManagerProvider;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.ConnectableType;
@@ -89,8 +90,12 @@ public class StandardExecutionProgress implements 
ExecutionProgress {
     }
 
     @Override
-    public boolean isFailurePort(final String portName) {
-        return failurePortNames.contains(portName);
+    public boolean isFailurePort(final Connectable connectable) {
+        if (connectable instanceof Port && ((Port) 
connectable).getPortFunction() == PortFunction.FAILURE) {
+            return true;
+        }
+
+        return failurePortNames.contains(connectable.getName());
     }
 
     @Override
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
index 36e6f49430..63afae2adf 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
@@ -17,23 +17,6 @@
 
 package org.apache.nifi.stateless.engine;
 
-import java.time.Duration;
-import java.time.temporal.ChronoUnit;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.asset.AssetManager;
 import org.apache.nifi.attribute.expression.language.VariableImpact;
@@ -58,7 +41,6 @@ import 
org.apache.nifi.controller.reporting.LogComponentStatuses;
 import org.apache.nifi.controller.repository.CounterRepository;
 import org.apache.nifi.controller.repository.FlowFileEventRepository;
 import org.apache.nifi.controller.scheduling.LifecycleStateManager;
-import org.apache.nifi.controller.scheduling.StandardLifecycleStateManager;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.encrypt.PropertyEncryptor;
 import org.apache.nifi.engine.FlowEngine;
@@ -96,6 +78,24 @@ import org.apache.nifi.util.FormatUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
 import static java.util.Objects.requireNonNull;
 
 public class StandardStatelessEngine implements StatelessEngine {
@@ -116,6 +116,7 @@ public class StandardStatelessEngine implements 
StatelessEngine {
     private final ProvenanceRepository provenanceRepository;
     private final ExtensionRepository extensionRepository;
     private final CounterRepository counterRepository;
+    private final LifecycleStateManager lifecycleStateManager;
     private final Duration statusTaskInterval;
     private final Duration componentEnableTimeout;
 
@@ -142,6 +143,7 @@ public class StandardStatelessEngine implements 
StatelessEngine {
         this.extensionRepository = requireNonNull(builder.extensionRepository, 
"Extension Repository must be provided");
         this.counterRepository = requireNonNull(builder.counterRepository, 
"Counter Repository must be provided");
         this.assetManager = requireNonNull(builder.assetManager, "Asset 
Manager must be provided");
+        this.lifecycleStateManager = 
requireNonNull(builder.lifecycleStateManager, "Lifecycle State Manager must be 
provided");
         this.statusTaskInterval = parseDuration(builder.statusTaskInterval);
         this.componentEnableTimeout = 
parseDuration(builder.componentEnableTimeout);
 
@@ -196,7 +198,6 @@ public class StandardStatelessEngine implements 
StatelessEngine {
         overrideParameters(parameterContextMap, parameterValueProvider);
 
         final List<ReportingTaskNode> reportingTaskNodes = 
createReportingTasks(dataflowDefinition);
-        final LifecycleStateManager lifecycleStateManager = new 
StandardLifecycleStateManager();
         final StandardStatelessFlow dataflow = new 
StandardStatelessFlow(childGroup, reportingTaskNodes, 
controllerServiceProvider, processContextFactory,
             repositoryContextFactory, dataflowDefinition, 
stateManagerProvider, processScheduler, bulletinRepository, 
lifecycleStateManager, componentEnableTimeout);
 
@@ -684,6 +685,7 @@ public class StandardStatelessEngine implements 
StatelessEngine {
         private CounterRepository counterRepository = null;
         private String statusTaskInterval = null;
         private String componentEnableTimeout = null;
+        private LifecycleStateManager lifecycleStateManager = null;
         private AssetManager assetManager = null;
 
         public Builder extensionManager(final ExtensionManager 
extensionManager) {
@@ -751,6 +753,11 @@ public class StandardStatelessEngine implements 
StatelessEngine {
             return this;
         }
 
+        public Builder lifecycleStateManager(final LifecycleStateManager 
lifecycleStateManager) {
+            this.lifecycleStateManager = lifecycleStateManager;
+            return this;
+        }
+
         public StandardStatelessEngine build() {
             return new StandardStatelessEngine(this);
         }
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
index 9fbba530e5..84b5051f4e 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
@@ -33,6 +33,8 @@ import 
org.apache.nifi.controller.repository.StandardCounterRepository;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import 
org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
 import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository;
+import org.apache.nifi.controller.scheduling.LifecycleStateManager;
+import org.apache.nifi.controller.scheduling.StandardLifecycleStateManager;
 import org.apache.nifi.controller.scheduling.StatelessProcessScheduler;
 import 
org.apache.nifi.controller.scheduling.StatelessProcessSchedulerInitializationContext;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
@@ -156,8 +158,9 @@ public class StandardStatelessDataflowFactory implements 
StatelessDataflowFactor
             final StatelessStateManagerProvider stateManagerProvider = new 
StatelessStateManagerProvider();
 
             final ParameterContextManager parameterContextManager = new 
StandardParameterContextManager();
+            final LifecycleStateManager lifecycleStateManager = new 
StandardLifecycleStateManager();
             final Duration processorStartTimeoutDuration = 
Duration.ofSeconds((long) 
FormatUtils.getPreciseTimeDuration(engineConfiguration.getProcessorStartTimeout(),
 TimeUnit.SECONDS));
-            processScheduler = new StatelessProcessScheduler(extensionManager, 
processorStartTimeoutDuration);
+            processScheduler = new StatelessProcessScheduler(extensionManager, 
lifecycleStateManager, processorStartTimeoutDuration);
             provenanceRepo = new StatelessProvenanceRepository(1_000);
             provenanceRepo.initialize(EventReporter.NO_OP, new 
StatelessAuthorizer(), new StatelessProvenanceAuthorizableFactory(), 
IdentifierLookup.EMPTY);
 
@@ -226,6 +229,7 @@ public class StandardStatelessDataflowFactory implements 
StatelessDataflowFactor
                     .counterRepository(counterRepo)
                     
.statusTaskInterval(engineConfiguration.getStatusTaskInterval())
                     
.componentEnableTimeout(engineConfiguration.getComponentEnableTimeout())
+                    .lifecycleStateManager(lifecycleStateManager)
                     .build();
 
             final StatelessFlowManager flowManager = new 
StatelessFlowManager(flowFileEventRepo, parameterContextManager, 
statelessEngine, () -> true, sslContext, bulletinRepository);
@@ -240,7 +244,7 @@ public class StandardStatelessDataflowFactory implements 
StatelessDataflowFactor
             flowFileRepo = new StatelessFlowFileRepository();
 
             final RepositoryContextFactory repositoryContextFactory = new 
StatelessRepositoryContextFactory(contentRepo, flowFileRepo, flowFileEventRepo,
-                counterRepo, provenanceRepo, stateManagerProvider);
+                counterRepo, stateManagerProvider);
             final StatelessEngineInitializationContext 
statelessEngineInitializationContext = new 
StatelessEngineInitializationContext(controllerServiceProvider, flowManager, 
processContextFactory,
                 repositoryContextFactory);
 
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
index 330b2a0f92..d94a7c25da 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
@@ -31,12 +31,14 @@ import org.apache.nifi.connectable.Port;
 import org.apache.nifi.controller.ComponentNode;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.Counter;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.CounterRepository;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.controller.repository.RepositoryContext;
 import org.apache.nifi.controller.repository.RepositoryRecord;
@@ -46,11 +48,12 @@ import 
org.apache.nifi.controller.repository.metrics.NopPerformanceTracker;
 import org.apache.nifi.controller.scheduling.LifecycleStateManager;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.controller.service.StandardConfigurationContext;
-import org.apache.nifi.controller.state.StandardStateMap;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.lifecycle.ProcessorStopLifecycleMethods;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
@@ -84,7 +87,9 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
@@ -95,6 +100,8 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 public class StandardStatelessFlow implements StatelessDataflow {
@@ -126,8 +133,8 @@ public class StandardStatelessFlow implements 
StatelessDataflow {
     private volatile ExecutorService runDataflowExecutor;
     private volatile ScheduledExecutorService backgroundTaskExecutor;
     private volatile boolean initialized = false;
-    private volatile Boolean stateful = null;
     private volatile boolean shutdown = false;
+    private volatile boolean manageControllerServices = true;
 
     public StandardStatelessFlow(final ProcessGroup rootGroup, final 
List<ReportingTaskNode> reportingTasks, final ControllerServiceProvider 
controllerServiceProvider,
                                  final ProcessContextFactory 
processContextFactory, final RepositoryContextFactory repositoryContextFactory, 
final DataflowDefinition dataflowDefinition,
@@ -236,6 +243,7 @@ public class StandardStatelessFlow implements 
StatelessDataflow {
         }
 
         initialized = true;
+        manageControllerServices = 
initializationContext.isEnableControllerServices();
 
         // Trigger validation to occur so that components can be 
enabled/started.
         performValidation();
@@ -370,29 +378,48 @@ public class StandardStatelessFlow implements 
StatelessDataflow {
         }
 
         logger.info("Stopping all components");
-        final CompletableFuture<Void> stopComponentsFuture = 
rootGroup.stopComponents();
+
+        final Set<ProcessorNode> runningProcessors = new 
HashSet<>(rootGroup.findAllProcessors());
+        unscheduleProcessors(runningProcessors);
 
         // Wait for the graceful shutdown period for all processors to stop. 
If the processors do not stop within this time,
         // then interrupt them.
         if (runDataflowExecutor != null && interruptProcessors) {
             if (gracefulShutdownDuration.isZero()) {
                 logger.info("Shutting down all components immediately without 
waiting for graceful shutdown period");
+                tracker.triggerFailureCallbacks(new TerminatedTaskException());
                 runDataflowExecutor.shutdownNow();
             } else {
-                try {
-                    
stopComponentsFuture.get(gracefulShutdownDuration.toMillis(), 
TimeUnit.MILLISECONDS);
-                } catch (final Exception e) {
-                    logger.warn("Stateless flow failed to stop all components 
gracefully after {} millis. Interrupting all running components.", 
gracefulShutdownDuration.toMillis(), e);
-                    if (e instanceof InterruptedException) {
-                        Thread.interrupted();
-                    }
-
+                final boolean gracefullyStopped = 
waitForProcessorThreadsToComplete(runningProcessors, gracefulShutdownDuration);
+                if (gracefullyStopped) {
+                    logger.info("All Processors have finished running; 
triggering session callbacks");
+                    tracker.triggerCallbacks();
+                } else {
+                    logger.warn("{} Processors did not finish running within 
the graceful shutdown period of {} millis. Interrupting all running components. 
Processors still running: {}",
+                        runningProcessors.size(), 
gracefulShutdownDuration.toMillis(), runningProcessors);
+                    tracker.triggerFailureCallbacks(new 
TerminatedTaskException());
                     runDataflowExecutor.shutdownNow();
                 }
             }
+        } else if (runDataflowExecutor != null) {
+            waitForProcessorThreadsToComplete(runningProcessors, 
gracefulShutdownDuration);
+            tracker.triggerCallbacks();
+        }
+
+        // Stop components but do not trigger @OnUnscheduled because those 
were already triggered.
+        final CompletableFuture<Void> stopFuture = 
rootGroup.stopComponents(ProcessorStopLifecycleMethods.TRIGGER_ONSTOPPED);
+        try {
+            stopFuture.get(gracefulShutdownDuration.toMillis(), 
TimeUnit.MILLISECONDS);
+        } catch (final InterruptedException ie) {
+            Thread.currentThread().interrupt();
+        } catch (final Exception ignored) {
+            // Whether the Processors stopped gracefully or not doesn't 
matter, we will continue to terminate any active processor.
+        }
+
+        for (final ProcessorNode processorNode : 
rootGroup.findAllProcessors()) {
+            processScheduler.terminateProcessor(processorNode);
         }
 
-        stopComponentsFuture.join();
         
rootGroup.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::shutdown);
 
         if (triggerComponentShutdown) {
@@ -402,23 +429,32 @@ public class StandardStatelessFlow implements 
StatelessDataflow {
         reportingTasks.forEach(processScheduler::unschedule);
 
         final Set<ControllerServiceNode> allControllerServices = 
rootGroup.findAllControllerServices();
-        logger.info("Disabling {} Controller Services", 
allControllerServices.size());
 
-        try {
-            
controllerServiceProvider.disableControllerServicesAsync(allControllerServices).get();
-        } catch (final InterruptedException ie) {
-            Thread.currentThread().interrupt();
-            logger.error("Failed to properly disable one or more of the 
following Controller Services: {} due to being interrupted while waiting for 
them to disable", allControllerServices, ie);
-        } catch (final Exception e) {
-            logger.error("Failed to properly disable one or more of the 
following Controller Services: {}", allControllerServices, e);
+        if (manageControllerServices) {
+            logger.info("Disabling {} Controller Services", 
allControllerServices.size());
+
+            if (!allControllerServices.isEmpty()) {
+                try {
+                    
controllerServiceProvider.disableControllerServicesAsync(allControllerServices).get();
+                } catch (final InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                    logger.error("Failed to properly disable one or more of 
the following Controller Services due to being interrupted while waiting for 
them to disable: {}",
+                        allControllerServices, ie);
+                } catch (final Exception e) {
+                    logger.error("Failed to properly disable one or more of 
the following Controller Services: {}", allControllerServices, e);
+                }
+
+                logger.info("Finished disabling all Controller Services");
+            }
         }
 
-        logger.info("Finished disabling all Controller Services");
         stateManagerProvider.shutdown();
 
         // invoke any methods annotated with @OnShutdown on Controller Services
         if (triggerComponentShutdown) {
-            allControllerServices.forEach(cs -> 
processScheduler.shutdownControllerService(cs, controllerServiceProvider));
+            if (manageControllerServices) {
+                allControllerServices.forEach(cs -> 
processScheduler.shutdownControllerService(cs, controllerServiceProvider));
+            }
 
             // invoke any methods annotated with @OnShutdown on Reporting Tasks
             reportingTasks.forEach(processScheduler::shutdownReportingTask);
@@ -430,6 +466,51 @@ public class StandardStatelessFlow implements 
StatelessDataflow {
         logger.info("Finished shutting down dataflow");
     }
 
+    private void unscheduleProcessors(final Set<ProcessorNode> 
runningProcessors) {
+        for (final ProcessorNode processor : runningProcessors) {
+            if (!processor.isRunning()) {
+                continue;
+            }
+
+            final ProcessContext processContext = 
processContextFactory.createProcessContext(processor);
+            processor.triggerOnUnscheduled(processContext);
+        }
+    }
+
+    private boolean waitForProcessorThreadsToComplete(final Set<ProcessorNode> 
runningProcessors, final Duration gracefulShutdownDuration) {
+        final long maxEndTime = System.currentTimeMillis() + 
gracefulShutdownDuration.toMillis();
+
+        // While there are still elements in the runningProcessors set, we 
will continue to check if they have stopped.
+        while (!runningProcessors.isEmpty()) {
+            // Get a list of all stopped processors so that we can remove 
them. We must convert to a List and then remove
+            // in order to avoid a ConcurrentModificationException.
+            final List<ProcessorNode> stopped = runningProcessors.stream()
+                .filter(proc -> proc.getActiveThreadCount() == 0)
+                .toList();
+
+            stopped.forEach(runningProcessors::remove);
+
+            // If there are no running processors left, then we can return 
true. Otherwise, if we've reached out max time,
+            // we return false. If neither stopping condition is met, sleep 
for a bit and check again.
+            if (runningProcessors.isEmpty()) {
+                return true;
+            }
+
+            if (System.currentTimeMillis() > maxEndTime) {
+                return false;
+            }
+
+            try {
+                Thread.sleep(10);
+            } catch (final InterruptedException e) {
+                Thread.interrupted();
+                return false;
+            }
+        }
+
+        return true;
+    }
+
     @Override
     public StatelessDataflowValidation performValidation() {
         final Map<ComponentNode, List<ValidationResult>> resultsMap = new 
HashMap<>();
@@ -465,15 +546,26 @@ public class StandardStatelessFlow implements 
StatelessDataflow {
 
     private void enableControllerServices(final ProcessGroup processGroup) {
         final Set<ControllerServiceNode> services = 
processGroup.getControllerServices(false);
-        for (final ControllerServiceNode serviceNode : services) {
-            final Future<?> future = 
controllerServiceProvider.enableControllerServiceAndDependencies(serviceNode);
+        final Future<Void> future = 
controllerServiceProvider.enableControllerServicesAsync(services);
 
-            try {
-                future.get(this.componentEnableTimeoutMillis, 
TimeUnit.MILLISECONDS);
-            } catch (final Exception e) {
-                throw new IllegalStateException("Controller Service " + 
serviceNode + " has not fully enabled. Current Validation Status is "
-                    + serviceNode.getValidationStatus() + " with validation 
Errors: " + serviceNode.getValidationErrors(), e);
+        try {
+            future.get(this.componentEnableTimeoutMillis, 
TimeUnit.MILLISECONDS);
+        } catch (final Exception e) {
+            final StringBuilder validationMessage = new StringBuilder("The 
following Controller Services have not fully enabled:\n");
+            for (final ControllerServiceNode serviceNode : services) {
+                if (serviceNode.getState() == ControllerServiceState.ENABLED) {
+                    continue;
+                }
+
+                validationMessage.append("Controller Service 
").append(serviceNode)
+                    .append(" has Validation Status ")
+                    .append(serviceNode.getValidationStatus())
+                    .append(" with validation Errors: ")
+                    .append(serviceNode.getValidationErrors())
+                    .append("\n");
             }
+
+            throw new 
IllegalStateException(validationMessage.toString().trim(), e);
         }
 
         
processGroup.getProcessGroups().forEach(this::enableControllerServices);
@@ -609,17 +701,6 @@ public class StandardStatelessFlow implements 
StatelessDataflow {
         }
     }
 
-    @Override
-    public boolean isStateful() {
-        if (stateful == null) {
-            final boolean hasStatefulReportingTask = 
reportingTasks.stream().anyMatch(this::isStateful);
-            if (hasStatefulReportingTask) {
-                return true;
-            }
-            stateful = isStateful(rootGroup);
-        }
-        return stateful;
-    }
 
     private boolean isStateful(final ProcessGroup processGroup) {
         final boolean hasStatefulProcessor = 
processGroup.getProcessors().stream().anyMatch(this::isStateful);
@@ -783,63 +864,38 @@ public class StandardStatelessFlow implements 
StatelessDataflow {
     }
 
     @Override
-    public void setComponentStates(final Map<String, String> componentStates, 
final Scope scope) {
-        final Map<String, StateMap> stateMaps = 
deserializeStateMaps(componentStates);
-        stateManagerProvider.updateComponentsStates(stateMaps, scope);
-    }
-
-    private Map<String, StateMap> deserializeStateMaps(final Map<String, 
String> componentStates) {
-        if (componentStates == null) {
-            return Collections.emptyMap();
-        }
-
-        final Map<String, StateMap> deserializedStateMaps = new HashMap<>();
-
-        for (final Map.Entry<String, String> entry : 
componentStates.entrySet()) {
-            final String componentId = entry.getKey();
-            final String serialized = entry.getValue();
-
-            final SerializableStateMap deserialized;
-            try {
-                deserialized = objectMapper.readValue(serialized, 
SerializableStateMap.class);
-            } catch (final Exception e) {
-                // We want to avoid throwing an Exception here because if we 
do, we may never be able to run the flow again, at least not without
-                // destroying all state that exists for the component. Would 
be better to simply skip the state for this component
-                logger.error("Failed to deserialized components' state for 
component with ID {}. State will be reset to empty", componentId, e);
-                continue;
-            }
-
-            final StateMap stateMap = new 
StandardStateMap(deserialized.getStateValues(), 
Optional.ofNullable(deserialized.getVersion()));
-            deserializedStateMaps.put(componentId, stateMap);
-        }
-
-        return deserializedStateMaps;
+    public BulletinRepository getBulletinRepository() {
+        return bulletinRepository;
     }
 
     @Override
-    public boolean isSourcePrimaryNodeOnly() {
-        for (final Connectable connectable : rootConnectables) {
-            if (connectable.isIsolated()) {
-                return true;
-            }
-        }
-
-        return false;
+    public OptionalLong getCounter(final String componentId, final String 
counterName) {
+        final String instanceId = findInstanceId(componentId);
+        return findCounter(counter -> counter.getContext().endsWith(" (" + 
instanceId + ")") && counter.getName().equals(counterName));
     }
 
     @Override
-    public long getSourceYieldExpiration() {
-        long latest = 0L;
-        for (final Connectable connectable : rootConnectables) {
-            latest = Math.max(latest, connectable.getYieldExpiration());
-        }
-
-        return latest;
+    public Map<String, Long> getCounters(final Pattern counterNamePattern) {
+        final CounterRepository counterRepository = 
repositoryContextFactory.getCounterRepository();
+        return counterRepository.getCounters().stream()
+            .filter(counter -> !counter.getContext().startsWith("All ") && 
counterNamePattern.matcher(counter.getName()).matches())
+            .collect(Collectors.toMap(Counter::getName, Counter::getValue));
     }
 
-    @Override
-    public BulletinRepository getBulletinRepository() {
-        return bulletinRepository;
+    private String findInstanceId(final String componentId) {
+        return rootGroup.findAllProcessors().stream()
+            .filter(processor -> Objects.equals(processor.getIdentifier(), 
componentId) || Objects.equals(processor.getVersionedComponentId().orElse(""), 
componentId))
+            .findFirst()
+            .map(ProcessorNode::getIdentifier)
+            .orElse(null);
+    }
+
+    private OptionalLong findCounter(final Predicate<Counter> predicate) {
+        final CounterRepository counterRepository = 
repositoryContextFactory.getCounterRepository();
+        return counterRepository.getCounters().stream()
+            .filter(predicate)
+            .mapToLong(Counter::getValue)
+            .findFirst();
     }
 
     @SuppressWarnings("unused")
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/RepositoryContextFactory.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/RepositoryContextFactory.java
index 5643fb6d95..f073562802 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/RepositoryContextFactory.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/RepositoryContextFactory.java
@@ -19,6 +19,7 @@ package org.apache.nifi.stateless.repository;
 
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.CounterRepository;
 import org.apache.nifi.controller.repository.FlowFileEventRepository;
 import org.apache.nifi.controller.repository.FlowFileRepository;
 import org.apache.nifi.controller.repository.RepositoryContext;
@@ -33,5 +34,7 @@ public interface RepositoryContextFactory {
 
     FlowFileEventRepository getFlowFileEventRepository();
 
+    CounterRepository getCounterRepository();
+
     void shutdown();
 }
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContextFactory.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContextFactory.java
index 49cc0e3353..b034d0f82d 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContextFactory.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContextFactory.java
@@ -42,7 +42,7 @@ public class StatelessRepositoryContextFactory implements 
RepositoryContextFacto
     private final StateManagerProvider stateManagerProvider;
 
     public StatelessRepositoryContextFactory(final ContentRepository 
contentRepository, final FlowFileRepository flowFileRepository, final 
FlowFileEventRepository flowFileEventRepository,
-                                             final CounterRepository 
counterRepository, final ProvenanceEventRepository provenanceRepository, final 
StateManagerProvider stateManagerProvider) {
+                                             final CounterRepository 
counterRepository, final StateManagerProvider stateManagerProvider) {
         this.contentRepository = contentRepository;
         this.flowFileRepository = flowFileRepository;
         this.flowFileEventRepository = flowFileEventRepository;
@@ -71,6 +71,11 @@ public class StatelessRepositoryContextFactory implements 
RepositoryContextFacto
         return flowFileEventRepository;
     }
 
+    @Override
+    public CounterRepository getCounterRepository() {
+        return counterRepository;
+    }
+
     @Override
     public void shutdown() {
         contentRepository.shutdown();
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/StatelessProcessSession.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/StatelessProcessSession.java
index 4f78639b83..7695043f36 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/StatelessProcessSession.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/StatelessProcessSession.java
@@ -17,9 +17,11 @@
 
 package org.apache.nifi.stateless.session;
 
+import org.apache.nifi.components.PortFunction;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Port;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.controller.repository.StandardProcessSession;
 import org.apache.nifi.controller.repository.metrics.NopPerformanceTracker;
@@ -92,8 +94,8 @@ public class StatelessProcessSession extends 
StandardProcessSession {
         // If we don't require synchronous commits, we can trigger the async 
commit, but we can't call the callback yet, because we only can call the 
success callback when we've completed the
         // dataflow in order to ensure that we don't destroy data in a way 
that it can't be replayed if the downstream processors fail.
         if (!requireSynchronousCommits) {
-            super.commitAsync();
             tracker.addCallback(connectable, onSuccess, onFailure, this);
+            super.commitAsync();
             return;
         }
 
@@ -232,7 +234,7 @@ public class StatelessProcessSession extends 
StandardProcessSession {
             return false;
         }
 
-        if (executionProgress.isFailurePort(connectable.getName())) {
+        if (connectable instanceof Port && ((Port) 
connectable).getPortFunction() == PortFunction.FAILURE) {
             return true;
         }
 
diff --git 
a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java
 
b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java
index 8bc146d089..d735b02483 100644
--- 
a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java
+++ 
b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java
@@ -43,6 +43,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -67,7 +68,7 @@ public class StatelessSystemIT {
 
     @AfterEach
     public void shutdownFlows() {
-        createdFlows.forEach(StatelessDataflow::shutdown);
+        createdFlows.forEach(dataflow -> dataflow.shutdown(true, true, 
Duration.ofSeconds(1)));
     }
 
     protected StatelessEngineConfiguration getEngineConfiguration() {
diff --git 
a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/ProcessorLifecycleIT.java
 
b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/ProcessorLifecycleIT.java
index 6eb246a384..fe62588992 100644
--- 
a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/ProcessorLifecycleIT.java
+++ 
b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/ProcessorLifecycleIT.java
@@ -17,6 +17,8 @@
 
 package org.apache.nifi.stateless.basics;
 
+import org.apache.nifi.components.PortFunction;
+import org.apache.nifi.flow.VersionedPort;
 import org.apache.nifi.flow.VersionedProcessor;
 import org.apache.nifi.stateless.StatelessSystemIT;
 import org.apache.nifi.stateless.VersionedFlowBuilder;
@@ -25,19 +27,35 @@ import org.apache.nifi.stateless.flow.DataflowTrigger;
 import org.apache.nifi.stateless.flow.StatelessDataflow;
 import org.apache.nifi.stateless.flow.TriggerResult;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class ProcessorLifecycleIT extends StatelessSystemIT {
+    public static final String STOPPED_AND_SUCCESS = "Stopped and Success";
+    public static final String STOPPED_AND_FAILURE = "Stopped and Failure";
+    public static final String RUNNING_AND_SUCCESS = "Running and Success";
+    public static final String RUNNING_AND_FAILURE = "Running and Failure";
+
     private static final Logger logger = 
LoggerFactory.getLogger(ProcessorLifecycleIT.class);
 
     @Test
@@ -53,7 +71,7 @@ public class ProcessorLifecycleIT extends StatelessSystemIT {
         flowBuilder.createConnection(generate, writeLifecycleEvents, 
"success");
 
         
writeLifecycleEvents.setAutoTerminatedRelationships(Collections.singleton("success"));
-        writeLifecycleEvents.setProperties(Collections.singletonMap("Event 
File", eventsFile.getAbsolutePath()));
+        writeLifecycleEvents.setProperties(Map.of("Event File", 
eventsFile.getAbsolutePath()));
 
         final StatelessDataflow dataflow = 
loadDataflow(flowBuilder.getFlowSnapshot(), Collections.emptyList());
 
@@ -75,4 +93,156 @@ public class ProcessorLifecycleIT extends StatelessSystemIT 
{
 
         assertEquals(Arrays.asList("OnScheduled", "OnTrigger", 
"OnUnscheduled", "OnStopped"), events);
     }
+
+    @Test
+    @Timeout(value = 30, unit = TimeUnit.SECONDS)
+    public void testSessionCommitCallbacksCalledBeforeStopOnFailure() throws 
StatelessConfigurationException, IOException, InterruptedException {
+        final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder();
+
+        final VersionedProcessor generate = 
flowBuilder.createSimpleProcessor("GenerateAndCountCallbacks");
+        final VersionedPort port = flowBuilder.createOutputPort("Out");
+        port.setPortFunction(PortFunction.FAILURE);
+
+        flowBuilder.createConnection(generate, port, "success");
+        final StatelessDataflow dataflow = 
loadDataflow(flowBuilder.getFlowSnapshot(), Collections.emptyList());
+        final DataflowTrigger trigger = dataflow.trigger();
+        final TriggerResult result = trigger.getResult();
+
+        assertFalse(result.isSuccessful());
+
+        assertCounters(dataflow, generate.getIdentifier(), 
RUNNING_AND_FAILURE);
+    }
+
+    private void assertCounters(final StatelessDataflow dataflow, final String 
componentId, final String positiveCounterName) {
+        for (final String counterName : new String[] {RUNNING_AND_SUCCESS, 
RUNNING_AND_FAILURE, STOPPED_AND_SUCCESS, STOPPED_AND_FAILURE}) {
+            final OptionalLong counterValue = dataflow.getCounter(componentId, 
counterName);
+
+            if (Objects.equals(counterName, positiveCounterName)) {
+                assertTrue(counterValue.isPresent(), "Expected Counter '" + 
counterName + "' to be present but it was not. Counters: " + 
dataflow.getCounters(Pattern.compile(".+")));
+                assertEquals(1, counterValue.getAsLong());
+            } else {
+                assertTrue(counterValue.isEmpty(), "Counter '" + counterName + 
"' has a value of " + counterValue);
+            }
+        }
+    }
+
+    @Test
+    @Timeout(value = 30, unit = TimeUnit.SECONDS)
+    public void testSessionCommitCallbacksCalledBeforeStopOnTimeout() throws 
StatelessConfigurationException, IOException, InterruptedException {
+        final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder();
+
+        final VersionedProcessor generate = 
flowBuilder.createSimpleProcessor("GenerateAndCountCallbacks");
+        final VersionedProcessor sleep = 
flowBuilder.createSimpleProcessor("Sleep");
+        sleep.setProperties(Map.of("onTrigger Sleep Time", "1 min"));
+        sleep.setAutoTerminatedRelationships(Set.of("success"));
+
+        flowBuilder.createConnection(generate, sleep, "success");
+        final StatelessDataflow dataflow = 
loadDataflow(flowBuilder.getFlowSnapshot(), Collections.emptyList());
+        final DataflowTrigger trigger = dataflow.trigger();
+        final Optional<TriggerResult> optionalTriggerResult = 
trigger.getResult(100, TimeUnit.MILLISECONDS);
+        assertTrue(optionalTriggerResult.isEmpty());
+        trigger.cancel();
+
+        // Give it 2 seconds to finish its processing and make sure that we 
see no counters incremented.
+        // We expect no counters to be incremented because, even though the we 
should see an attempt to increment
+        // the counter, the ProcessSession should have been terminated, 
disallowing the call to ProcessSession.adjustCounter()
+        Thread.sleep(2000L);
+        final Map<String, Long> counters = 
dataflow.getCounters(Pattern.compile(".+"));
+        assertTrue(counters.isEmpty(), "Expected no counters to be incremented 
but found: " + counters);
+    }
+
+    @Test
+    @Timeout(value = 30, unit = TimeUnit.SECONDS)
+    public void testSessionCommitCallbacksCalledBeforeStopOnSuccess() throws 
StatelessConfigurationException, IOException, InterruptedException {
+        final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder();
+
+        final VersionedProcessor generate = 
flowBuilder.createSimpleProcessor("GenerateAndCountCallbacks");
+        final VersionedPort port = flowBuilder.createOutputPort("Out");
+        port.setPortFunction(PortFunction.STANDARD); // Being explicit that 
port should not be a failure port.
+
+        flowBuilder.createConnection(generate, port, "success");
+        final StatelessDataflow dataflow = 
loadDataflow(flowBuilder.getFlowSnapshot(), Collections.emptyList());
+        final DataflowTrigger trigger = dataflow.trigger();
+        final TriggerResult result = trigger.getResult();
+
+        assertTrue(result.isSuccessful());
+        result.acknowledge();
+
+        assertCounters(dataflow, generate.getIdentifier(), 
RUNNING_AND_SUCCESS);
+    }
+
+    @Test
+    @Timeout(value = 30, unit = TimeUnit.SECONDS)
+    public void 
testSessionCommitCallbacksCalledBeforeStopOnShutdownWhenProcessorDoesNotGracefullyFinish()
 throws StatelessConfigurationException, IOException, InterruptedException {
+        final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder();
+
+        final VersionedProcessor generate = 
flowBuilder.createSimpleProcessor("GenerateAndCountCallbacks");
+        final VersionedProcessor sleep = 
flowBuilder.createSimpleProcessor("Sleep");
+        sleep.setProperties(Map.of("onTrigger Sleep Time", "1 min", "Stop 
Sleeping When Unscheduled", "false"));
+        sleep.setAutoTerminatedRelationships(Set.of("success"));
+
+        flowBuilder.createConnection(generate, sleep, "success");
+        final StatelessDataflow dataflow = 
loadDataflow(flowBuilder.getFlowSnapshot(), Collections.emptyList());
+        final DataflowTrigger trigger = dataflow.trigger();
+        final Optional<TriggerResult> optionalTriggerResult = 
trigger.getResult(100, TimeUnit.MILLISECONDS);
+        assertTrue(optionalTriggerResult.isEmpty());
+        dataflow.shutdown(false, true, Duration.ofMillis(1));
+
+        while (dataflow.getCounters(Pattern.compile(".+")).isEmpty()) {
+            Thread.sleep(100L);
+        }
+
+        assertCounters(dataflow, generate.getIdentifier(), 
RUNNING_AND_FAILURE);
+    }
+
+    @Test
+    @Timeout(value = 30, unit = TimeUnit.SECONDS)
+    public void 
testSessionCommitCallbacksCalledBeforeStopOnShutdownWhenProcessorFinishes() 
throws StatelessConfigurationException, IOException, InterruptedException {
+        final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder();
+
+        final VersionedProcessor generate = 
flowBuilder.createSimpleProcessor("GenerateAndCountCallbacks");
+        final VersionedProcessor sleep = 
flowBuilder.createSimpleProcessor("Sleep");
+        sleep.setProperties(Map.of("onTrigger Sleep Time", "1 min", "Stop 
Sleeping When Unscheduled", "true"));
+        sleep.setAutoTerminatedRelationships(Set.of("success"));
+
+        flowBuilder.createConnection(generate, sleep, "success");
+        final StatelessDataflow dataflow = 
loadDataflow(flowBuilder.getFlowSnapshot(), Collections.emptyList());
+        final DataflowTrigger trigger = dataflow.trigger();
+        final Optional<TriggerResult> optionalTriggerResult = 
trigger.getResult(100, TimeUnit.MILLISECONDS);
+        assertTrue(optionalTriggerResult.isEmpty());
+        dataflow.shutdown(false, true, Duration.ofSeconds(5));
+
+        while (dataflow.getCounters(Pattern.compile(".+")).isEmpty()) {
+            Thread.sleep(100L);
+        }
+
+        assertCounters(dataflow, generate.getIdentifier(), 
RUNNING_AND_SUCCESS);
+    }
+
+    @Test
+    @Timeout(value = 30, unit = TimeUnit.SECONDS)
+    public void 
testSessionCommitCallbacksCalledBeforeStopOnShutdownWithTimeout() throws 
StatelessConfigurationException, IOException, InterruptedException {
+        final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder();
+
+        final VersionedProcessor generate = 
flowBuilder.createSimpleProcessor("GenerateAndCountCallbacks");
+        final VersionedProcessor sleep = 
flowBuilder.createSimpleProcessor("Sleep");
+        sleep.setProperties(Map.of("onTrigger Sleep Time", "1 min",
+            "Ignore Interrupts", "true",
+            "Stop Sleeping When Unscheduled", "false"));
+        sleep.setAutoTerminatedRelationships(Set.of("success"));
+
+        flowBuilder.createConnection(generate, sleep, "success");
+        final StatelessDataflow dataflow = 
loadDataflow(flowBuilder.getFlowSnapshot(), Collections.emptyList());
+        final DataflowTrigger trigger = dataflow.trigger();
+        final Optional<TriggerResult> optionalTriggerResult = 
trigger.getResult(2, TimeUnit.SECONDS);
+        assertTrue(optionalTriggerResult.isEmpty());
+        dataflow.shutdown(false, true, Duration.ofMillis(1));
+
+        while (dataflow.getCounters(Pattern.compile(".+")).isEmpty()) {
+            Thread.sleep(100L);
+        }
+
+        assertCounters(dataflow, generate.getIdentifier(), 
RUNNING_AND_FAILURE);
+    }
+
 }
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateAndCountCallbacks.java
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateAndCountCallbacks.java
new file mode 100644
index 0000000000..9c8f7eb30a
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateAndCountCallbacks.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.time.Duration;
+import java.util.Set;
+
+@CapabilityDescription("Generates an empty FlowFile and counts how many times 
the session commit callback is called " +
+                       "for both success and failure before and after 
Processor is stopped")
+public class GenerateAndCountCallbacks extends AbstractProcessor {
+
+    public static final String STOPPED_AND_SUCCESS = "Stopped and Success";
+    public static final String STOPPED_AND_FAILURE = "Stopped and Failure";
+    public static final String RUNNING_AND_SUCCESS = "Running and Success";
+    public static final String RUNNING_AND_FAILURE = "Running and Failure";
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("All FlowFiles are routed to this relationship")
+        .build();
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return Set.of(REL_SUCCESS);
+    }
+
+    private volatile boolean stopped = false;
+
+    @OnScheduled
+    public void onScheduled() {
+        stopped = false;
+        getLogger().info("Processor started");
+    }
+
+    @OnStopped
+    public void onStopped() {
+        stopped = true;
+        getLogger().info("Processor stopped");
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile flowFile = session.create();
+        session.transfer(flowFile, REL_SUCCESS);
+
+        session.commitAsync(() -> {
+            sleepUninterruptibly(Duration.ofSeconds(3));
+
+            if (stopped) {
+                session.adjustCounter(STOPPED_AND_SUCCESS, 1, true);
+                getLogger().error("Success callback called after Processor 
Stopped");
+            } else {
+                session.adjustCounter(RUNNING_AND_SUCCESS, 1, true);
+                getLogger().info("Success callback called while Processor 
Running");
+            }
+        },
+        failureCause -> {
+            sleepUninterruptibly(Duration.ofSeconds(3));
+
+            if (stopped) {
+                session.adjustCounter(STOPPED_AND_FAILURE, 1, true);
+                getLogger().error("Failure callback called after Processor 
Stopped; Failure cause: {}", failureCause.toString());
+            } else {
+                session.adjustCounter(RUNNING_AND_FAILURE, 1, true);
+                getLogger().warn("Failure callback called while Processor 
Running; Failure cause: {}", failureCause.toString());
+            }
+        });
+    }
+
+    private void sleepUninterruptibly(final Duration duration) {
+        long endTime = System.currentTimeMillis() + duration.toMillis();
+        while (System.currentTimeMillis() < endTime) {
+            try {
+                Thread.sleep(endTime - System.currentTimeMillis());
+            } catch (final InterruptedException ignored) {
+                // Ignore interruption and continue sleeping until the 
specified duration has elapsed
+            }
+        }
+    }
+}
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/Sleep.java
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/Sleep.java
index 4edbd945e0..03427938d9 100644
--- 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/Sleep.java
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/Sleep.java
@@ -22,6 +22,7 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyDescriptor.Builder;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.cs.tests.system.SleepService;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractProcessor;
@@ -31,7 +32,6 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -73,6 +73,32 @@ public class Sleep extends AbstractProcessor {
         .required(false)
         .identifiesControllerService(SleepService.class)
         .build();
+    static final PropertyDescriptor IGNORE_INTERRUPTS = new Builder()
+        .name("Ignore Interrupts")
+        .description("If true, the processor will not respond to interrupts 
while sleeping.")
+        .required(true)
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .build();
+    static final PropertyDescriptor STOP_SLEEPING_WHEN_UNSCHEDULED = new 
Builder()
+        .name("Stop Sleeping When Unscheduled")
+        .description("If true, the processor will stop sleeping whenever the 
processor is unscheduled. " +
+            "If false, the processor will continue sleeping until the sleep 
time has elapsed. This property only applies to the " +
+            "onTrigger Sleep Time.")
+        .required(false)
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .build();
+
+    private static final List<PropertyDescriptor> properties = List.of(
+        VALIDATE_SLEEP_TIME,
+        ON_SCHEDULED_SLEEP_TIME,
+        ON_TRIGGER_SLEEP_TIME,
+        ON_STOPPED_SLEEP_TIME,
+        SLEEP_SERVICE,
+        IGNORE_INTERRUPTS,
+        STOP_SLEEPING_WHEN_UNSCHEDULED
+    );
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
         .name("success")
@@ -80,46 +106,61 @@ public class Sleep extends AbstractProcessor {
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.add(VALIDATE_SLEEP_TIME);
-        properties.add(ON_SCHEDULED_SLEEP_TIME);
-        properties.add(ON_TRIGGER_SLEEP_TIME);
-        properties.add(ON_STOPPED_SLEEP_TIME);
-        properties.add(SLEEP_SERVICE);
         return properties;
     }
 
     @Override
     public Set<Relationship> getRelationships() {
-        return Collections.singleton(REL_SUCCESS);
+        return Set.of(REL_SUCCESS);
     }
 
     @Override
     protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
         final long sleepMillis = 
validationContext.getProperty(VALIDATE_SLEEP_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
-        sleep(sleepMillis);
+        sleep(sleepMillis, isIgnoreInterrupt(validationContext), false);
 
         return Collections.emptyList();
     }
 
-    private void sleep(final long millis) {
-        if (millis > 0L) {
+    private void sleep(final long millis, final boolean ignoreInterrupts, 
final boolean stopSleepOnUnscheduled) {
+        if (millis == 0L) {
+            return;
+        }
+
+        final long stopTime = System.currentTimeMillis() + millis;
+        while (System.currentTimeMillis() < stopTime) {
+            if (stopSleepOnUnscheduled && !isScheduled()) {
+                return;
+            }
+
+            // Sleep for up to the stopTime but no more than 50 milliseconds 
at a time. This gives us a chance
+            // to periodically check if the processor is still scheduled
+            final long sleepMillis = Math.min(stopTime - 
System.currentTimeMillis(), 50L);
             try {
-                Thread.sleep(millis);
-            } catch (final InterruptedException ie) {
+                Thread.sleep(sleepMillis);
+            } catch (final InterruptedException e) {
+                if (ignoreInterrupts) {
+                    continue;
+                }
+
                 Thread.currentThread().interrupt();
+                return;
             }
         }
     }
 
     @OnScheduled
     public void onEnabled(final ProcessContext context) {
-        
sleep(context.getProperty(ON_SCHEDULED_SLEEP_TIME).asTimePeriod(TimeUnit.MILLISECONDS));
+        
sleep(context.getProperty(ON_SCHEDULED_SLEEP_TIME).asTimePeriod(TimeUnit.MILLISECONDS),
 isIgnoreInterrupt(context), false);
+    }
+
+    private boolean isIgnoreInterrupt(final PropertyContext context) {
+        return context.getProperty(IGNORE_INTERRUPTS).asBoolean();
     }
 
     @OnStopped
     public void onDisabled(final ProcessContext context) {
-        
sleep(context.getProperty(ON_STOPPED_SLEEP_TIME).asTimePeriod(TimeUnit.MILLISECONDS));
+        
sleep(context.getProperty(ON_STOPPED_SLEEP_TIME).asTimePeriod(TimeUnit.MILLISECONDS),
 isIgnoreInterrupt(context), false);
     }
 
 
@@ -131,13 +172,16 @@ public class Sleep extends AbstractProcessor {
         }
 
         final long sleepMillis = 
context.getProperty(ON_TRIGGER_SLEEP_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
-        sleep(sleepMillis);
+        final boolean ignoreInterrupts = isIgnoreInterrupt(context);
+        final boolean stopSleepOnUnscheduled = 
context.getProperty(STOP_SLEEPING_WHEN_UNSCHEDULED).asBoolean();
+        sleep(sleepMillis, ignoreInterrupts, stopSleepOnUnscheduled);
 
         final SleepService service = 
context.getProperty(SLEEP_SERVICE).asControllerService(SleepService.class);
         if (service != null) {
             service.sleep();
         }
 
+        getLogger().info("Finished onTrigger sleep");
         session.transfer(flowFile, REL_SUCCESS);
     }
 }
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 91c2062c38..9df0ad1a4d 100644
--- 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -27,6 +27,7 @@ 
org.apache.nifi.processors.tests.system.EnsureProcessorConfigurationCorrect
 org.apache.nifi.processors.tests.system.EvaluatePropertiesWithDifferentELScopes
 org.apache.nifi.processors.tests.system.FakeProcessor
 org.apache.nifi.processors.tests.system.FakeDynamicPropertiesProcessor
+org.apache.nifi.processors.tests.system.GenerateAndCountCallbacks
 org.apache.nifi.processors.tests.system.GenerateFlowFile
 org.apache.nifi.processors.tests.system.HoldInput
 org.apache.nifi.processors.tests.system.IngestFile
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/stateless/StatelessBasicsIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/stateless/StatelessBasicsIT.java
index 77020c914b..780438ca2c 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/stateless/StatelessBasicsIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/stateless/StatelessBasicsIT.java
@@ -533,7 +533,7 @@ public class StatelessBasicsIT extends NiFiSystemIT {
 
     @Test
     public void testStopGroupMakesFlowFileAvailable() throws 
NiFiClientException, IOException, InterruptedException {
-        createFlowShell();
+        createFlowShell("3 sec");
 
         // Add a sleep for 1 min
         final ProcessorEntity sleep = getClientUtil().createProcessor("Sleep", 
statelessGroup.getId());

Reply via email to