This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 1be32d6992 NIFI-14654 Fixed Stateless Process Group session lifecycle
handling (#10014)
1be32d6992 is described below
commit 1be32d6992acf40bc63f69a57cf8b72eedc42cd7
Author: Mark Payne <[email protected]>
AuthorDate: Fri Jun 13 15:44:44 2025 -0400
NIFI-14654 Fixed Stateless Process Group session lifecycle handling (#10014)
Ensure when a Stateless Process Group is stopped that we properly call all
success and/or failure session callbacks before stopping the Processor. In
order to make system tests to verify this more straight-forward, also exposed
the CounterRepository such that system tests can obtain counter values to
ensure that methods are called appropriately.
Signed-off-by: David Handermann <[email protected]>
---
.../nifi/controller/StandardProcessorNode.java | 103 +++++----
.../apache/nifi/groups/StandardProcessGroup.java | 21 +-
.../apache/nifi/controller/ProcessScheduler.java | 6 +-
.../org/apache/nifi/controller/ProcessorNode.java | 17 +-
.../nifi/controller/scheduling/LifecycleState.java | 17 +-
.../java/org/apache/nifi/groups/ProcessGroup.java | 13 +-
.../lifecycle/ProcessorStopLifecycleMethods.java | 61 ++++++
.../flow/StandardStatelessGroupNodeFactory.java | 7 +-
.../scheduling/StandardProcessScheduler.java | 29 ++-
.../nifi/groups/StandardStatelessGroupNode.java | 6 +-
.../scheduling/TestStandardProcessScheduler.java | 3 +-
.../controller/service/mock/MockProcessGroup.java | 3 +-
.../nifi/stateless/flow/StatelessDataflow.java | 17 +-
.../scheduling/StatelessProcessScheduler.java | 39 +++-
.../nifi/stateless/engine/ExecutionProgress.java | 7 +-
.../engine/StandardExecutionProgress.java | 9 +-
.../stateless/engine/StandardStatelessEngine.java | 45 ++--
.../flow/StandardStatelessDataflowFactory.java | 8 +-
.../nifi/stateless/flow/StandardStatelessFlow.java | 234 +++++++++++++--------
.../repository/RepositoryContextFactory.java | 3 +
.../StatelessRepositoryContextFactory.java | 7 +-
.../stateless/session/StatelessProcessSession.java | 6 +-
.../apache/nifi/stateless/StatelessSystemIT.java | 3 +-
.../stateless/basics/ProcessorLifecycleIT.java | 172 ++++++++++++++-
.../tests/system/GenerateAndCountCallbacks.java | 105 +++++++++
.../apache/nifi/processors/tests/system/Sleep.java | 76 +++++--
.../services/org.apache.nifi.processor.Processor | 1 +
.../tests/system/stateless/StatelessBasicsIT.java | 2 +-
28 files changed, 790 insertions(+), 230 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 1eb7a05dee..a1ac2a1205 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -56,6 +56,7 @@ import
org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.tasks.ActiveTask;
import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.lifecycle.ProcessorStopLifecycleMethods;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.logging.LogRepositoryFactory;
@@ -92,6 +93,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.support.CronExpression;
+import java.lang.annotation.Annotation;
import java.lang.management.ThreadInfo;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@@ -939,7 +941,7 @@ public class StandardProcessorNode extends ProcessorNode
implements Connectable
// When getScheduledState() is called, we map the 'physical' state of
STOPPING to STOPPED. This is done in order to maintain
// backward compatibility because the UI and other clients will not
know of the (relatively newer) 'STOPPING' state.
- // Because of there previously was no STOPPING state, the way to
determine of a processor had truly stopped was to check if its
+ // Because there previously was no STOPPING state, the way to
determine of a processor had truly stopped was to check if its
// Scheduled State was STOPPED AND it had no active threads.
//
// Also, we can have a situation in which a processor is started while
invalid. Before the processor becomes valid, it can be stopped.
@@ -1800,18 +1802,24 @@ public class StandardProcessorNode extends
ProcessorNode implements Connectable
*/
@Override
public CompletableFuture<Void> stop(final ProcessScheduler
processScheduler, final ScheduledExecutorService executor, final ProcessContext
processContext,
- final SchedulingAgent schedulingAgent, final LifecycleState
lifecycleState, final boolean triggerLifecycleMethods) {
+ final SchedulingAgent schedulingAgent, final LifecycleState
lifecycleState, final ProcessorStopLifecycleMethods lifecycleMethods) {
final Processor processor = processorRef.get().getProcessor();
LOG.debug("Stopping processor: {}", this);
setDesiredState(ScheduledState.STOPPED);
final CompletableFuture<Void> future = new CompletableFuture<>();
-
addStopFuture(future);
// will ensure that the Processor represented by this node can only be
stopped once
if (this.scheduledState.compareAndSet(ScheduledState.RUNNING,
ScheduledState.STOPPING) ||
this.scheduledState.compareAndSet(ScheduledState.RUN_ONCE,
ScheduledState.STOPPING)) {
+ if (!lifecycleMethods.isTriggerOnUnscheduled() &&
!lifecycleMethods.isTriggerOnStopped()) {
+ // If we do not need to trigger either of the lifecycle
methods, we can simply call the complete action and be done
+ completeStopAction();
+ future.complete(null);
+ return future;
+ }
+
lifecycleState.incrementActiveThreadCount(null);
// will continue to monitor active threads, invoking OnStopped
once there are no
@@ -1823,17 +1831,11 @@ public class StandardProcessorNode extends
ProcessorNode implements Connectable
if (lifecycleState.isScheduled()) {
schedulingAgent.unschedule(StandardProcessorNode.this, lifecycleState);
- if (triggerLifecycleMethods) {
+ if (lifecycleMethods.isTriggerOnUnscheduled()) {
LOG.debug("Triggering @OnUnscheduled methods
of {}", this);
-
- activateThread();
- try (final NarCloseable ignored =
NarCloseable.withComponentNarLoader(getExtensionManager(),
processor.getClass(), processor.getIdentifier())) {
-
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class,
processor, processContext);
- } finally {
- deactivateThread();
- }
+ triggerOnUnscheduled(processContext);
} else {
- LOG.debug("Will not trigger @OnUnscheduled
methods of {} because triggerLifecycleState = false", this);
+ LOG.debug("Will not trigger @OnUnscheduled
methods of {} because ProcessorStopLifecycleMethods.isTriggerOnUnscheduled() =
false", this);
}
}
@@ -1841,38 +1843,35 @@ public class StandardProcessorNode extends
ProcessorNode implements Connectable
// performing the lifecycle actions counts as 1 thread.
final boolean allThreadsComplete =
lifecycleState.getActiveThreadCount() == 1;
if (allThreadsComplete) {
- if (triggerLifecycleMethods) {
- LOG.debug("Triggering @OnStopped methods of
{}", this);
-
- activateThread();
- try (final NarCloseable ignored =
NarCloseable.withComponentNarLoader(getExtensionManager(),
processor.getClass(), processor.getIdentifier())) {
-
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor,
processContext);
- } finally {
- deactivateThread();
- }
- } else {
- LOG.debug("Will not trigger @OnStopped methods
of {} because triggerLifecycleState = false", this);
- }
+ try {
+ if (lifecycleMethods.isTriggerOnStopped()) {
+ LOG.debug("Triggering @OnStopped methods
of {}", this);
- lifecycleState.decrementActiveThreadCount();
- completeStopAction();
-
- // This can happen only when we join a cluster. In
such a case, we can inherit a flow from the cluster that says that
- // the Processor is to be running. However, if the
Processor is already in the process of stopping, we cannot immediately
- // start running the Processor. As a result, we
check here, since the Processor is stopped, and then immediately start the
- // Processor if need be.
- final ScheduledState desired =
StandardProcessorNode.this.desiredState;
- if (desired == ScheduledState.RUNNING) {
- LOG.info("Finished stopping {} but desired
state is now RUNNING so will start processor", this);
-
getProcessGroup().startProcessor(StandardProcessorNode.this, true);
- } else if (desired == ScheduledState.DISABLED) {
- final boolean updated =
scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.DISABLED);
-
- if (updated) {
- LOG.info("Finished stopping {} but desired
state is now DISABLED so disabled processor", this);
+ triggerOnStopped(processContext);
} else {
- LOG.info("Finished stopping {} but desired
state is now DISABLED. Scheduled State could not be transitioned from STOPPED
to DISABLED, "
- + "though, so will allow the other
thread to finish state transition. Current state is {}", this,
scheduledState.get());
+ LOG.debug("Will not trigger @OnStopped
methods of {} because ProcessorStopLifecycleMethods.isTriggerOnStopped() =
false", this);
+ }
+ } finally {
+ lifecycleState.decrementActiveThreadCount();
+ completeStopAction();
+
+ // This can happen only when we join a
cluster. In such a case, we can inherit a flow from the cluster that says that
+ // the Processor is to be running. However, if
the Processor is already in the process of stopping, we cannot immediately
+ // start running the Processor. As a result,
we check here, since the Processor is stopped, and then immediately start the
+ // Processor if need be.
+ final ScheduledState desired =
StandardProcessorNode.this.desiredState;
+ if (desired == ScheduledState.RUNNING) {
+ LOG.info("Finished stopping {} but desired
state is now RUNNING so will start processor", this);
+
getProcessGroup().startProcessor(StandardProcessorNode.this, true);
+ } else if (desired == ScheduledState.DISABLED)
{
+ final boolean updated =
scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.DISABLED);
+
+ if (updated) {
+ LOG.info("Finished stopping {} but
desired state is now DISABLED so disabled processor", this);
+ } else {
+ LOG.info("Finished stopping {} but
desired state is now DISABLED. Scheduled State could not be transitioned from
STOPPED to DISABLED, "
+ + "though, so will allow the
other thread to finish state transition. Current state is {}", this,
scheduledState.get());
+ }
}
}
} else {
@@ -1900,6 +1899,26 @@ public class StandardProcessorNode extends ProcessorNode
implements Connectable
return future;
}
+ @Override
+ public void triggerOnUnscheduled(final ProcessContext processContext) {
+ triggerLifecycleMethod(processContext, OnUnscheduled.class);
+ }
+
+ private void triggerOnStopped(final ProcessContext processContext) {
+ triggerLifecycleMethod(processContext, OnStopped.class);
+ }
+
+ private void triggerLifecycleMethod(final ProcessContext processContext,
final Class<? extends Annotation> lifecycleAnnotation) {
+ final Processor processor = processorRef.get().getProcessor();
+
+ activateThread();
+ try (final NarCloseable ignored =
NarCloseable.withComponentNarLoader(getExtensionManager(),
processor.getClass(), processor.getIdentifier())) {
+
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(lifecycleAnnotation,
processor, processContext);
+ } finally {
+ deactivateThread();
+ }
+ }
+
/**
* Marks the processor as fully stopped, and completes any futures that
are to be completed as a result
*/
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index b49d52d113..426b30c2cf 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -69,6 +69,7 @@ import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedProcessGroup;
import
org.apache.nifi.flow.synchronization.StandardVersionedComponentSynchronizer;
import
org.apache.nifi.flow.synchronization.VersionedFlowSynchronizationContext;
+import org.apache.nifi.lifecycle.ProcessorStopLifecycleMethods;
import org.apache.nifi.logging.LogRepository;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.nar.ExtensionManager;
@@ -588,14 +589,15 @@ public final class StandardProcessGroup implements
ProcessGroup {
}
@Override
- public CompletableFuture<Void> stopComponents() {
+ public CompletableFuture<Void> stopComponents(final
ProcessorStopLifecycleMethods processorStopLifecycleMethods) {
readLock.lock();
try {
final List<CompletableFuture<Void>> futures = new ArrayList<>();
getProcessors().stream().filter(STOP_PROCESSORS_FILTER).forEach(node -> {
try {
- futures.add(node.getProcessGroup().stopProcessor(node));
+ final StandardProcessGroup immediateGroup =
(StandardProcessGroup) node.getProcessGroup();
+ futures.add(immediateGroup.stopProcessor(node,
processorStopLifecycleMethods));
} catch (final Throwable t) {
LOG.error("Unable to stop processor {}",
node.getIdentifier(), t);
}
@@ -1752,7 +1754,7 @@ public final class StandardProcessGroup implements
ProcessGroup {
processor.reloadAdditionalResourcesIfNecessary();
return scheduler.runProcessorOnce(processor, stopCallback);
- } catch (Exception e) {
+ } catch (final Exception e) {
processor.getLogger().error("Error while running processor {}
once.", processor, e);
return stopProcessor(processor);
} finally {
@@ -1826,6 +1828,17 @@ public final class StandardProcessGroup implements
ProcessGroup {
@Override
public CompletableFuture<Void> stopProcessor(final ProcessorNode
processor) {
+ // When using Stateless Engine, we do not want to trigger lifecycle
methods because the Stateless engine will create N
+ // ProcessorNode's, one for each Concurrent Task and use those.
Therefore, we do not want to trigger any lifecycle events
+ // on this ProcessorNode object, but we do need to call
stopProcessor() to ensure that we keep appropriate state
+ // about whether the Processor is scheduled, etc.
+ final ProcessorStopLifecycleMethods lifecycleMethods =
resolveExecutionEngine() == ExecutionEngine.STATELESS
+ ? ProcessorStopLifecycleMethods.TRIGGER_NONE :
ProcessorStopLifecycleMethods.TRIGGER_ALL;
+
+ return stopProcessor(processor, lifecycleMethods);
+ }
+
+ private CompletableFuture<Void> stopProcessor(final ProcessorNode
processor, final ProcessorStopLifecycleMethods lifecycleMethods) {
readLock.lock();
try {
if (!processors.containsKey(processor.getIdentifier())) {
@@ -1837,7 +1850,7 @@ public final class StandardProcessGroup implements
ProcessGroup {
throw new IllegalStateException("Processor is disabled");
}
- return scheduler.stopProcessor(processor);
+ return scheduler.stopProcessor(processor, lifecycleMethods);
} finally {
readLock.unlock();
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
index 416b774ee5..2df3cb477a 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
@@ -22,6 +22,7 @@ import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.groups.StatelessGroupNode;
+import org.apache.nifi.lifecycle.ProcessorStopLifecycleMethods;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
@@ -80,9 +81,10 @@ public interface ProcessScheduler {
* interrupt any threads that are currently running within the given
* Processor. If the Processor is not scheduled to run, does nothing.
*
- * @param procNode to stop
+ * @param procNode the Processor to stop
+ * @param lifecycleMethods the lifecycle methods to invoke when stopping
the processor
*/
- CompletableFuture<Void> stopProcessor(ProcessorNode procNode);
+ CompletableFuture<Void> stopProcessor(ProcessorNode procNode,
ProcessorStopLifecycleMethods lifecycleMethods);
/**
* Interrupts all threads that are currently active in the Processor in an
attempt to
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
index be53eabf81..7286ebd964 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -17,6 +17,7 @@
package org.apache.nifi.controller;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.validation.ValidationStatus;
@@ -25,6 +26,7 @@ import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.scheduling.LifecycleState;
import org.apache.nifi.controller.scheduling.SchedulingAgent;
import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.lifecycle.ProcessorStopLifecycleMethods;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.migration.ControllerServiceFactory;
@@ -240,11 +242,20 @@ public abstract class ProcessorNode extends
AbstractComponentNode implements Con
* @param scheduleState
* the ScheduleState that can be used to ensure that the
running state (STOPPED, RUNNING, etc.)
* as well as the active thread counts are kept in sync
- * @param triggerLifecycleMethods
- * whether or not to trigger lifecycle methods such as
@OnScheduled, @OnStopped, etc.
+ * @param lifecycleMethods
+ * Indicates which lifecycle methods should be triggered to run
*/
public abstract CompletableFuture<Void> stop(ProcessScheduler
processScheduler, ScheduledExecutorService executor,
- ProcessContext processContext, SchedulingAgent schedulingAgent,
LifecycleState scheduleState, boolean triggerLifecycleMethods);
+ ProcessContext processContext, SchedulingAgent schedulingAgent,
LifecycleState scheduleState, ProcessorStopLifecycleMethods lifecycleMethods);
+
+ /**
+ * Triggers any method with the {@link OnUnscheduled} annotation on the
Processor. This does not stop scheduling the Processor, change any state
+ * of the ProcessorNode, or invoke any other lifecycle methods. This
method should be used only in order to signal to a Processor that it may
+ * perform any necessary duties that are associated with no longer being
scheduled, such as proactively ending an onTrigger cycle.
+ *
+ * @param processContext the ProcessContext that may be provided to the
methods annotated with {@link OnUnscheduled}.
+ */
+ public abstract void triggerOnUnscheduled(ProcessContext processContext);
/**
* Marks all active tasks as terminated and interrupts all active threads
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/scheduling/LifecycleState.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/scheduling/LifecycleState.java
index fa7312eac3..a3095a86ea 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/scheduling/LifecycleState.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/scheduling/LifecycleState.java
@@ -19,6 +19,8 @@ package org.apache.nifi.controller.scheduling;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.controller.repository.ActiveProcessSessionFactory;
import org.apache.nifi.processor.exception.TerminatedTaskException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Collections;
@@ -31,6 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class LifecycleState {
+ private static final Logger logger =
LoggerFactory.getLogger(LifecycleState.class);
private final Object componentId;
private final AtomicInteger activeThreadCount = new AtomicInteger(0);
@@ -74,7 +77,12 @@ public class LifecycleState {
activeProcessSessionFactories.put(sessionFactory, null);
}
- return activeThreadCount.incrementAndGet();
+ final int updatedThreadCount = activeThreadCount.incrementAndGet();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Active Thread Count for {} incremented to {}",
componentId, updatedThreadCount, new RuntimeException("Stack Trace for
Debugging"));
+ }
+
+ return updatedThreadCount;
}
public synchronized int decrementActiveThreadCount() {
@@ -82,7 +90,12 @@ public class LifecycleState {
return activeThreadCount.get();
}
- return activeThreadCount.decrementAndGet();
+ final int updatedThreadCount = activeThreadCount.decrementAndGet();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Active Thread Count for {} decremented to {}",
componentId, updatedThreadCount, new RuntimeException("Stack Trace for
Debugging"));
+ }
+
+ return updatedThreadCount;
}
public int getActiveThreadCount() {
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
index 4a53e453f2..1dd9544985 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
@@ -35,6 +35,7 @@ import
org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.flow.ExecutionEngine;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.lifecycle.ProcessorStopLifecycleMethods;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterUpdate;
import org.apache.nifi.processor.Processor;
@@ -177,7 +178,17 @@ public interface ProcessGroup extends
ComponentAuthorizable, Positionable, Versi
* Stops all Processors, Local Ports, and Funnels that are directly within
* this group and child ProcessGroups, except for those that are disabled.
*/
- CompletableFuture<Void> stopComponents();
+ default CompletableFuture<Void> stopComponents() {
+ return stopComponents(ProcessorStopLifecycleMethods.TRIGGER_ALL);
+ }
+
+ /**
+ * Stops all Processors, Local Ports, and Funnels that are directly within
+ * this group and child ProcessGroups, except for those that are disabled.
+ *
+ * @param processorStopLifecycleMethods indicates which lifecycle methods
should be called when stopping Processors
+ */
+ CompletableFuture<Void> stopComponents(ProcessorStopLifecycleMethods
processorStopLifecycleMethods);
/**
* @return the scheduled state for this ProcessGroup, or
StatelessGroupScheduledState.STOPPED
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/lifecycle/ProcessorStopLifecycleMethods.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/lifecycle/ProcessorStopLifecycleMethods.java
new file mode 100644
index 0000000000..8c9081e17f
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/lifecycle/ProcessorStopLifecycleMethods.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.lifecycle;
+
+public interface ProcessorStopLifecycleMethods {
+
+ boolean isTriggerOnUnscheduled();
+
+ boolean isTriggerOnStopped();
+
+ ProcessorStopLifecycleMethods TRIGGER_ALL = new
ProcessorStopLifecycleMethods() {
+ @Override
+ public boolean isTriggerOnUnscheduled() {
+ return true;
+ }
+
+ @Override
+ public boolean isTriggerOnStopped() {
+ return true;
+ }
+ };
+
+ ProcessorStopLifecycleMethods TRIGGER_NONE = new
ProcessorStopLifecycleMethods() {
+ @Override
+ public boolean isTriggerOnUnscheduled() {
+ return false;
+ }
+
+ @Override
+ public boolean isTriggerOnStopped() {
+ return false;
+ }
+ };
+
+ ProcessorStopLifecycleMethods TRIGGER_ONSTOPPED = new
ProcessorStopLifecycleMethods() {
+ @Override
+ public boolean isTriggerOnUnscheduled() {
+ return false;
+ }
+
+ @Override
+ public boolean isTriggerOnStopped() {
+ return true;
+ }
+ };
+}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardStatelessGroupNodeFactory.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardStatelessGroupNodeFactory.java
index 1d9aae1344..3658b74763 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardStatelessGroupNodeFactory.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardStatelessGroupNodeFactory.java
@@ -70,7 +70,6 @@ import org.apache.nifi.stateless.engine.StatelessFlowManager;
import org.apache.nifi.stateless.engine.StatelessProcessContextFactory;
import org.apache.nifi.stateless.repository.RepositoryContextFactory;
import org.apache.nifi.stateless.repository.StatelessFlowFileRepository;
-import org.apache.nifi.stateless.repository.StatelessProvenanceRepository;
import org.apache.nifi.stateless.repository.StatelessRepositoryContextFactory;
import java.time.Duration;
@@ -119,7 +118,6 @@ public class StandardStatelessGroupNodeFactory implements
StatelessGroupNodeFact
final FlowFileRepository underlyingFlowFileRepository =
flowController.getRepositoryContextFactory().getFlowFileRepository();
final StatelessFlowFileRepository flowFileRepository = new
StatelessBridgeFlowFileRepository(underlyingFlowFileRepository,
resourceClaimManager);
- final StatelessProvenanceRepository statelessProvenanceRepository =
new StatelessProvenanceRepository(1_000);
flowFileRepository.initialize(resourceClaimManager);
final ContentRepository contentRepository = new
NonPurgeableContentRepository(flowController.getRepositoryContextFactory().getContentRepository());
@@ -128,7 +126,6 @@ public class StandardStatelessGroupNodeFactory implements
StatelessGroupNodeFact
flowFileRepository,
flowController.getFlowFileEventRepository(),
flowController.getCounterRepository(),
- statelessProvenanceRepository,
flowController.getStateManagerProvider());
final FlowMappingOptions flowMappingOptions = new
FlowMappingOptions.Builder()
@@ -226,7 +223,8 @@ public class StandardStatelessGroupNodeFactory implements
StatelessGroupNodeFact
}
};
- final StatelessProcessScheduler statelessScheduler = new
StatelessProcessScheduler(flowController.getExtensionManager(), Duration.of(10,
ChronoUnit.SECONDS));
+ final StatelessProcessScheduler statelessScheduler = new
StatelessProcessScheduler(flowController.getExtensionManager(),
+ flowController.getLifecycleStateManager(), Duration.of(10,
ChronoUnit.SECONDS));
final StatelessStateManagerProvider stateManagerProvider = new
StatelessStateManagerProvider();
final StatelessEngine statelessEngine = new
StandardStatelessEngine.Builder()
@@ -242,6 +240,7 @@ public class StandardStatelessGroupNodeFactory implements
StatelessGroupNodeFact
.stateManagerProvider(stateManagerProvider)
.kerberosConfiguration(kerberosConfig)
.statusTaskInterval(null)
+ .lifecycleStateManager(flowController.getLifecycleStateManager())
.build();
final BulletinRepository bulletinRepository =
flowController.getBulletinRepository();
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 6401ae0375..1a7a7dbb57 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -45,6 +45,7 @@ import
org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.flow.ExecutionEngine;
import org.apache.nifi.groups.StatelessGroupNode;
+import org.apache.nifi.lifecycle.ProcessorStopLifecycleMethods;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.nar.NarCloseable;
@@ -544,21 +545,20 @@ public final class StandardProcessScheduler implements
ProcessScheduler {
/**
* Stops the given {@link Processor} by invoking its
- * {@link ProcessorNode#stop(ProcessScheduler, ScheduledExecutorService,
ProcessContext, SchedulingAgent, LifecycleState, boolean)}
+ * {@link ProcessorNode#stop(ProcessScheduler, ScheduledExecutorService,
ProcessContext, SchedulingAgent, LifecycleState, ProcessorStopLifecycleMethods)}
* method.
*
- * @see StandardProcessorNode#stop(ProcessScheduler,
ScheduledExecutorService, ProcessContext, SchedulingAgent, LifecycleState,
boolean)
+ * @see StandardProcessorNode#stop(ProcessScheduler,
ScheduledExecutorService, ProcessContext, SchedulingAgent, LifecycleState,
ProcessorStopLifecycleMethods)
*/
@Override
- public synchronized CompletableFuture<Void> stopProcessor(final
ProcessorNode procNode) {
+ public synchronized CompletableFuture<Void> stopProcessor(final
ProcessorNode procNode, final ProcessorStopLifecycleMethods lifecycleMethods) {
final LifecycleState lifecycleState = getLifecycleState(procNode,
false, false);
- StandardProcessContext processContext = new
StandardProcessContext(procNode, getControllerServiceProvider(),
+ final StandardProcessContext processContext = new
StandardProcessContext(procNode, getControllerServiceProvider(),
getStateManager(procNode.getIdentifier()),
lifecycleState::isTerminated, flowController);
- final boolean triggerLifecycleMethods =
procNode.getProcessGroup().resolveExecutionEngine() !=
ExecutionEngine.STATELESS;
LOG.info("Stopping {}", procNode);
- return procNode.stop(this, this.componentLifeCycleThreadPool,
processContext, getSchedulingAgent(procNode), lifecycleState,
triggerLifecycleMethods);
+ return procNode.stop(this, this.componentLifeCycleThreadPool,
processContext, getSchedulingAgent(procNode), lifecycleState, lifecycleMethods);
}
@Override
@@ -844,8 +844,23 @@ public final class StandardProcessScheduler implements
ProcessScheduler {
}
private CompletableFuture<Void> enableControllerService(final
ControllerServiceNode service, final boolean completeFutureExceptionally) {
+ if (service.isActive()) {
+ LOG.debug("{} is already active, so not enabling it again",
service);
+ return CompletableFuture.completedFuture(null);
+ }
+
LOG.info("Enabling {}", service);
- return service.enable(this.componentLifeCycleThreadPool,
this.administrativeYieldMillis, completeFutureExceptionally);
+
+ final List<CompletableFuture<Void>> futures = new ArrayList<>();
+ final List<ControllerServiceNode> dependentServices =
service.getRequiredControllerServices();
+ for (final ControllerServiceNode dependentService : dependentServices)
{
+ // Enable Controller Service but if it fails, do not complete the
future Exceptionally. This allows us to wait up until the
+ // timeout for the service to enable, even if it needs to retry in
order to do so.
+ futures.add(enableControllerService(dependentService,
completeFutureExceptionally));
+ }
+
+ futures.add(service.enable(this.componentLifeCycleThreadPool,
this.administrativeYieldMillis, completeFutureExceptionally));
+ return CompletableFuture.allOf(futures.toArray(new
CompletableFuture[0]));
}
@Override
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardStatelessGroupNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardStatelessGroupNode.java
index 9ef1fe9103..6f0f96723d 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardStatelessGroupNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardStatelessGroupNode.java
@@ -43,6 +43,7 @@ import
org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.tasks.StatelessFlowTask;
import org.apache.nifi.flow.VersionedExternalFlow;
+import org.apache.nifi.lifecycle.ProcessorStopLifecycleMethods;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.processor.ProcessContext;
@@ -329,7 +330,7 @@ public class StandardStatelessGroupNode implements
StatelessGroupNode {
.transactionThresholds(TransactionThresholds.UNLIMITED)
.build();
- final ProcessScheduler processScheduler = new
StatelessProcessScheduler(extensionManager, Duration.of(10,
ChronoUnit.SECONDS)) {
+ final ProcessScheduler processScheduler = new
StatelessProcessScheduler(extensionManager, lifecycleStateManager,
Duration.of(10, ChronoUnit.SECONDS)) {
@Override
public void yield(final ProcessorNode procNode) {
if (isSourceProcessor(procNode)) {
@@ -461,7 +462,7 @@ public class StandardStatelessGroupNode implements
StatelessGroupNode {
final List<CompletableFuture<Void>> futures = new ArrayList<>();
for (final ProcessorNode procNode :
getProcessGroup().findAllProcessors()) {
- final CompletableFuture<Void> stopProcessorFuture =
scheduler.stopProcessor(procNode);
+ final CompletableFuture<Void> stopProcessorFuture =
scheduler.stopProcessor(procNode, ProcessorStopLifecycleMethods.TRIGGER_NONE);
futures.add(stopProcessorFuture);
}
@@ -482,7 +483,6 @@ public class StandardStatelessGroupNode implements
StatelessGroupNode {
}
-
private void stopPorts() {
final List<Port> allPorts = new ArrayList<>();
allPorts.addAll(getProcessGroup().findAllInputPorts());
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
index 250dd5d67d..bcbf5b375a 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
@@ -52,6 +52,7 @@ import
org.apache.nifi.controller.service.StandardControllerServiceProvider;
import org.apache.nifi.controller.service.mock.MockProcessGroup;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.lifecycle.ProcessorStopLifecycleMethods;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.nar.ExtensionDiscoveringManager;
import org.apache.nifi.nar.StandardExtensionDiscoveringManager;
@@ -272,7 +273,7 @@ public class TestStandardProcessScheduler {
Thread.sleep(25L);
- scheduler.stopProcessor(procNode);
+ scheduler.stopProcessor(procNode,
ProcessorStopLifecycleMethods.TRIGGER_ALL);
assertTrue(service.isActive());
assertSame(ControllerServiceState.ENABLING, service.getState());
scheduler.disableControllerService(service).get();
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
index f96964bf44..9000e35aa3 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
@@ -48,6 +48,7 @@ import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.groups.StatelessGroupNode;
import org.apache.nifi.groups.StatelessGroupScheduledState;
import org.apache.nifi.groups.VersionedComponentAdditions;
+import org.apache.nifi.lifecycle.ProcessorStopLifecycleMethods;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterUpdate;
import org.apache.nifi.registry.flow.FlowLocation;
@@ -163,7 +164,7 @@ public class MockProcessGroup implements ProcessGroup {
}
@Override
- public CompletableFuture<Void> stopComponents() {
+ public CompletableFuture<Void> stopComponents(final
ProcessorStopLifecycleMethods processorStopLifecycleMethods) {
return CompletableFuture.completedFuture(null);
}
diff --git
a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
index dc4a26dca5..9ae5efc882 100644
---
a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
+++
b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
@@ -24,7 +24,9 @@ import org.apache.nifi.reporting.BulletinRepository;
import java.io.InputStream;
import java.time.Duration;
import java.util.Map;
+import java.util.OptionalLong;
import java.util.Set;
+import java.util.regex.Pattern;
public interface StatelessDataflow {
/**
@@ -92,22 +94,13 @@ public interface StatelessDataflow {
boolean isFlowFileQueued();
- /**
- *
- * @return True if there are any processors in the dataflow with the
{@link org.apache.nifi.annotation.behavior.Stateful} annotation
- */
- boolean isStateful();
-
void purge();
Map<String, String> getComponentStates(Scope scope);
- void setComponentStates(Map<String, String> componentStates, Scope scope);
-
- boolean isSourcePrimaryNodeOnly();
-
- long getSourceYieldExpiration();
-
BulletinRepository getBulletinRepository();
+ OptionalLong getCounter(String componentId, String counterName);
+
+ Map<String, Long> getCounters(Pattern counterNamePattern);
}
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
index 9ec55ddfb5..555c1373ea 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
@@ -21,19 +21,20 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.validation.ValidationStatus;
+import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
-import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.SchedulingAgentCallback;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.groups.StatelessGroupNode;
+import org.apache.nifi.lifecycle.ProcessorStopLifecycleMethods;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.ProcessContext;
@@ -48,6 +49,7 @@ import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
@@ -65,6 +67,7 @@ public class StatelessProcessScheduler implements
ProcessScheduler {
private final SchedulingAgent schedulingAgent;
private final ExtensionManager extensionManager;
+ private final LifecycleStateManager lifecycleStateManager;
private FlowEngine componentLifeCycleThreadPool;
private ScheduledExecutorService componentMonitoringThreadPool;
@@ -74,8 +77,9 @@ public class StatelessProcessScheduler implements
ProcessScheduler {
private final long processorStartTimeoutMillis;
- public StatelessProcessScheduler(final ExtensionManager extensionManager,
final Duration processorStartTimeout) {
+ public StatelessProcessScheduler(final ExtensionManager extensionManager,
final LifecycleStateManager lifecycleStateManager, final Duration
processorStartTimeout) {
this.extensionManager = extensionManager;
+ this.lifecycleStateManager = lifecycleStateManager;
this.processorStartTimeoutMillis = processorStartTimeout.toMillis();
schedulingAgent = new StatelessSchedulingAgent(extensionManager);
}
@@ -155,17 +159,31 @@ public class StatelessProcessScheduler implements
ProcessScheduler {
}
@Override
- public CompletableFuture<Void> stopProcessor(final ProcessorNode procNode)
{
+ public CompletableFuture<Void> stopProcessor(final ProcessorNode procNode,
final ProcessorStopLifecycleMethods lifecycleMethods) {
logger.info("Stopping {}", procNode);
final ProcessContext processContext =
processContextFactory.createProcessContext(procNode);
- final LifecycleState lifecycleState = new
LifecycleState(procNode.getIdentifier());
- final boolean scheduled = procNode.getScheduledState() ==
ScheduledState.RUNNING || procNode.getActiveThreadCount() > 0;
- lifecycleState.setScheduled(scheduled);
- return procNode.stop(this, this.componentLifeCycleThreadPool,
processContext, schedulingAgent, lifecycleState, true);
+ final LifecycleState lifecycleState =
lifecycleStateManager.getOrRegisterLifecycleState(procNode.getIdentifier(),
false, false);
+ return procNode.stop(this, this.componentLifeCycleThreadPool,
processContext, schedulingAgent, lifecycleState, lifecycleMethods);
}
@Override
public void terminateProcessor(final ProcessorNode procNode) {
+ final Optional<LifecycleState> optionalLifecycleState =
lifecycleStateManager.getLifecycleState(procNode.getIdentifier());
+ if (optionalLifecycleState.isEmpty()) {
+ logger.debug("No Lifecycle State registered for {} so will not
terminate", procNode);
+ return;
+ }
+
+ final LifecycleState lifecycleState = optionalLifecycleState.get();
+ final int activeThreadCount = lifecycleState.getActiveThreadCount();
+ if (activeThreadCount == 0) {
+ logger.debug("No active threads registered for {} so will not
terminate", procNode);
+ return;
+ }
+
+ lifecycleState.terminate();
+ final int terminationCounts = procNode.terminate();
+ logger.info("Terminated {} with {} active threads", procNode,
terminationCounts);
}
@Override
@@ -243,7 +261,12 @@ public class StatelessProcessScheduler implements
ProcessScheduler {
@Override
public int getActiveThreadCount(final Object scheduled) {
- return 0;
+ if (!(scheduled instanceof final Connectable connectable)) {
+ return 0;
+ }
+
+ final Optional<LifecycleState> optionalLifecycleState =
lifecycleStateManager.getLifecycleState(connectable.getIdentifier());
+ return
optionalLifecycleState.map(LifecycleState::getActiveThreadCount).orElse(0);
}
@Override
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/ExecutionProgress.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/ExecutionProgress.java
index 21dde30ec9..748815512c 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/ExecutionProgress.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/ExecutionProgress.java
@@ -64,12 +64,7 @@ public interface ExecutionProgress {
*/
void notifyExecutionFailed(Throwable cause);
- /**
- * Indicates whether or not the port with the given name is considered a
Failure Port
- * @param portName the name of the port
- * @return <code>true</code> if the port is a failure port,
<code>false</code> otherwise
- */
- boolean isFailurePort(String portName);
+ boolean isFailurePort(Connectable connectable);
boolean isTerminalPort(Connectable connectable);
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardExecutionProgress.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardExecutionProgress.java
index 24fa2ca4b5..247c4efdc1 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardExecutionProgress.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardExecutionProgress.java
@@ -17,6 +17,7 @@
package org.apache.nifi.stateless.engine;
+import org.apache.nifi.components.PortFunction;
import org.apache.nifi.components.state.StatelessStateManagerProvider;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
@@ -89,8 +90,12 @@ public class StandardExecutionProgress implements
ExecutionProgress {
}
@Override
- public boolean isFailurePort(final String portName) {
- return failurePortNames.contains(portName);
+ public boolean isFailurePort(final Connectable connectable) {
+ if (connectable instanceof Port && ((Port)
connectable).getPortFunction() == PortFunction.FAILURE) {
+ return true;
+ }
+
+ return failurePortNames.contains(connectable.getName());
}
@Override
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
index 36e6f49430..63afae2adf 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
@@ -17,23 +17,6 @@
package org.apache.nifi.stateless.engine;
-import java.time.Duration;
-import java.time.temporal.ChronoUnit;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.asset.AssetManager;
import org.apache.nifi.attribute.expression.language.VariableImpact;
@@ -58,7 +41,6 @@ import
org.apache.nifi.controller.reporting.LogComponentStatuses;
import org.apache.nifi.controller.repository.CounterRepository;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.scheduling.LifecycleStateManager;
-import org.apache.nifi.controller.scheduling.StandardLifecycleStateManager;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.engine.FlowEngine;
@@ -96,6 +78,24 @@ import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
import static java.util.Objects.requireNonNull;
public class StandardStatelessEngine implements StatelessEngine {
@@ -116,6 +116,7 @@ public class StandardStatelessEngine implements
StatelessEngine {
private final ProvenanceRepository provenanceRepository;
private final ExtensionRepository extensionRepository;
private final CounterRepository counterRepository;
+ private final LifecycleStateManager lifecycleStateManager;
private final Duration statusTaskInterval;
private final Duration componentEnableTimeout;
@@ -142,6 +143,7 @@ public class StandardStatelessEngine implements
StatelessEngine {
this.extensionRepository = requireNonNull(builder.extensionRepository,
"Extension Repository must be provided");
this.counterRepository = requireNonNull(builder.counterRepository,
"Counter Repository must be provided");
this.assetManager = requireNonNull(builder.assetManager, "Asset
Manager must be provided");
+ this.lifecycleStateManager =
requireNonNull(builder.lifecycleStateManager, "Lifecycle State Manager must be
provided");
this.statusTaskInterval = parseDuration(builder.statusTaskInterval);
this.componentEnableTimeout =
parseDuration(builder.componentEnableTimeout);
@@ -196,7 +198,6 @@ public class StandardStatelessEngine implements
StatelessEngine {
overrideParameters(parameterContextMap, parameterValueProvider);
final List<ReportingTaskNode> reportingTaskNodes =
createReportingTasks(dataflowDefinition);
- final LifecycleStateManager lifecycleStateManager = new
StandardLifecycleStateManager();
final StandardStatelessFlow dataflow = new
StandardStatelessFlow(childGroup, reportingTaskNodes,
controllerServiceProvider, processContextFactory,
repositoryContextFactory, dataflowDefinition,
stateManagerProvider, processScheduler, bulletinRepository,
lifecycleStateManager, componentEnableTimeout);
@@ -684,6 +685,7 @@ public class StandardStatelessEngine implements
StatelessEngine {
private CounterRepository counterRepository = null;
private String statusTaskInterval = null;
private String componentEnableTimeout = null;
+ private LifecycleStateManager lifecycleStateManager = null;
private AssetManager assetManager = null;
public Builder extensionManager(final ExtensionManager
extensionManager) {
@@ -751,6 +753,11 @@ public class StandardStatelessEngine implements
StatelessEngine {
return this;
}
+ public Builder lifecycleStateManager(final LifecycleStateManager
lifecycleStateManager) {
+ this.lifecycleStateManager = lifecycleStateManager;
+ return this;
+ }
+
public StandardStatelessEngine build() {
return new StandardStatelessEngine(this);
}
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
index 9fbba530e5..84b5051f4e 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
@@ -33,6 +33,8 @@ import
org.apache.nifi.controller.repository.StandardCounterRepository;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import
org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository;
+import org.apache.nifi.controller.scheduling.LifecycleStateManager;
+import org.apache.nifi.controller.scheduling.StandardLifecycleStateManager;
import org.apache.nifi.controller.scheduling.StatelessProcessScheduler;
import
org.apache.nifi.controller.scheduling.StatelessProcessSchedulerInitializationContext;
import org.apache.nifi.controller.service.ControllerServiceProvider;
@@ -156,8 +158,9 @@ public class StandardStatelessDataflowFactory implements
StatelessDataflowFactor
final StatelessStateManagerProvider stateManagerProvider = new
StatelessStateManagerProvider();
final ParameterContextManager parameterContextManager = new
StandardParameterContextManager();
+ final LifecycleStateManager lifecycleStateManager = new
StandardLifecycleStateManager();
final Duration processorStartTimeoutDuration =
Duration.ofSeconds((long)
FormatUtils.getPreciseTimeDuration(engineConfiguration.getProcessorStartTimeout(),
TimeUnit.SECONDS));
- processScheduler = new StatelessProcessScheduler(extensionManager,
processorStartTimeoutDuration);
+ processScheduler = new StatelessProcessScheduler(extensionManager,
lifecycleStateManager, processorStartTimeoutDuration);
provenanceRepo = new StatelessProvenanceRepository(1_000);
provenanceRepo.initialize(EventReporter.NO_OP, new
StatelessAuthorizer(), new StatelessProvenanceAuthorizableFactory(),
IdentifierLookup.EMPTY);
@@ -226,6 +229,7 @@ public class StandardStatelessDataflowFactory implements
StatelessDataflowFactor
.counterRepository(counterRepo)
.statusTaskInterval(engineConfiguration.getStatusTaskInterval())
.componentEnableTimeout(engineConfiguration.getComponentEnableTimeout())
+ .lifecycleStateManager(lifecycleStateManager)
.build();
final StatelessFlowManager flowManager = new
StatelessFlowManager(flowFileEventRepo, parameterContextManager,
statelessEngine, () -> true, sslContext, bulletinRepository);
@@ -240,7 +244,7 @@ public class StandardStatelessDataflowFactory implements
StatelessDataflowFactor
flowFileRepo = new StatelessFlowFileRepository();
final RepositoryContextFactory repositoryContextFactory = new
StatelessRepositoryContextFactory(contentRepo, flowFileRepo, flowFileEventRepo,
- counterRepo, provenanceRepo, stateManagerProvider);
+ counterRepo, stateManagerProvider);
final StatelessEngineInitializationContext
statelessEngineInitializationContext = new
StatelessEngineInitializationContext(controllerServiceProvider, flowManager,
processContextFactory,
repositoryContextFactory);
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
index 330b2a0f92..d94a7c25da 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
@@ -31,12 +31,14 @@ import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.Counter;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.CounterRepository;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.RepositoryContext;
import org.apache.nifi.controller.repository.RepositoryRecord;
@@ -46,11 +48,12 @@ import
org.apache.nifi.controller.repository.metrics.NopPerformanceTracker;
import org.apache.nifi.controller.scheduling.LifecycleStateManager;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.controller.service.StandardConfigurationContext;
-import org.apache.nifi.controller.state.StandardStateMap;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.lifecycle.ProcessorStopLifecycleMethods;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
@@ -84,7 +87,9 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
+import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
@@ -95,6 +100,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
public class StandardStatelessFlow implements StatelessDataflow {
@@ -126,8 +133,8 @@ public class StandardStatelessFlow implements
StatelessDataflow {
private volatile ExecutorService runDataflowExecutor;
private volatile ScheduledExecutorService backgroundTaskExecutor;
private volatile boolean initialized = false;
- private volatile Boolean stateful = null;
private volatile boolean shutdown = false;
+ private volatile boolean manageControllerServices = true;
public StandardStatelessFlow(final ProcessGroup rootGroup, final
List<ReportingTaskNode> reportingTasks, final ControllerServiceProvider
controllerServiceProvider,
final ProcessContextFactory
processContextFactory, final RepositoryContextFactory repositoryContextFactory,
final DataflowDefinition dataflowDefinition,
@@ -236,6 +243,7 @@ public class StandardStatelessFlow implements
StatelessDataflow {
}
initialized = true;
+ manageControllerServices =
initializationContext.isEnableControllerServices();
// Trigger validation to occur so that components can be
enabled/started.
performValidation();
@@ -370,29 +378,48 @@ public class StandardStatelessFlow implements
StatelessDataflow {
}
logger.info("Stopping all components");
- final CompletableFuture<Void> stopComponentsFuture =
rootGroup.stopComponents();
+
+ final Set<ProcessorNode> runningProcessors = new
HashSet<>(rootGroup.findAllProcessors());
+ unscheduleProcessors(runningProcessors);
// Wait for the graceful shutdown period for all processors to stop.
If the processors do not stop within this time,
// then interrupt them.
if (runDataflowExecutor != null && interruptProcessors) {
if (gracefulShutdownDuration.isZero()) {
logger.info("Shutting down all components immediately without
waiting for graceful shutdown period");
+ tracker.triggerFailureCallbacks(new TerminatedTaskException());
runDataflowExecutor.shutdownNow();
} else {
- try {
-
stopComponentsFuture.get(gracefulShutdownDuration.toMillis(),
TimeUnit.MILLISECONDS);
- } catch (final Exception e) {
- logger.warn("Stateless flow failed to stop all components
gracefully after {} millis. Interrupting all running components.",
gracefulShutdownDuration.toMillis(), e);
- if (e instanceof InterruptedException) {
- Thread.interrupted();
- }
-
+ final boolean gracefullyStopped =
waitForProcessorThreadsToComplete(runningProcessors, gracefulShutdownDuration);
+ if (gracefullyStopped) {
+ logger.info("All Processors have finished running;
triggering session callbacks");
+ tracker.triggerCallbacks();
+ } else {
+ logger.warn("{} Processors did not finish running within
the graceful shutdown period of {} millis. Interrupting all running components.
Processors still running: {}",
+ runningProcessors.size(),
gracefulShutdownDuration.toMillis(), runningProcessors);
+ tracker.triggerFailureCallbacks(new
TerminatedTaskException());
runDataflowExecutor.shutdownNow();
}
}
+ } else if (runDataflowExecutor != null) {
+ waitForProcessorThreadsToComplete(runningProcessors,
gracefulShutdownDuration);
+ tracker.triggerCallbacks();
+ }
+
+ // Stop components but do not trigger @OnUnscheduled because those
were already triggered.
+ final CompletableFuture<Void> stopFuture =
rootGroup.stopComponents(ProcessorStopLifecycleMethods.TRIGGER_ONSTOPPED);
+ try {
+ stopFuture.get(gracefulShutdownDuration.toMillis(),
TimeUnit.MILLISECONDS);
+ } catch (final InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ } catch (final Exception ignored) {
+ // Whether the Processors stopped gracefully or not doesn't
matter, we will continue to terminate any active processor.
+ }
+
+ for (final ProcessorNode processorNode :
rootGroup.findAllProcessors()) {
+ processScheduler.terminateProcessor(processorNode);
}
- stopComponentsFuture.join();
rootGroup.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::shutdown);
if (triggerComponentShutdown) {
@@ -402,23 +429,32 @@ public class StandardStatelessFlow implements
StatelessDataflow {
reportingTasks.forEach(processScheduler::unschedule);
final Set<ControllerServiceNode> allControllerServices =
rootGroup.findAllControllerServices();
- logger.info("Disabling {} Controller Services",
allControllerServices.size());
- try {
-
controllerServiceProvider.disableControllerServicesAsync(allControllerServices).get();
- } catch (final InterruptedException ie) {
- Thread.currentThread().interrupt();
- logger.error("Failed to properly disable one or more of the
following Controller Services: {} due to being interrupted while waiting for
them to disable", allControllerServices, ie);
- } catch (final Exception e) {
- logger.error("Failed to properly disable one or more of the
following Controller Services: {}", allControllerServices, e);
+ if (manageControllerServices) {
+ logger.info("Disabling {} Controller Services",
allControllerServices.size());
+
+ if (!allControllerServices.isEmpty()) {
+ try {
+
controllerServiceProvider.disableControllerServicesAsync(allControllerServices).get();
+ } catch (final InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ logger.error("Failed to properly disable one or more of
the following Controller Services due to being interrupted while waiting for
them to disable: {}",
+ allControllerServices, ie);
+ } catch (final Exception e) {
+ logger.error("Failed to properly disable one or more of
the following Controller Services: {}", allControllerServices, e);
+ }
+
+ logger.info("Finished disabling all Controller Services");
+ }
}
- logger.info("Finished disabling all Controller Services");
stateManagerProvider.shutdown();
// invoke any methods annotated with @OnShutdown on Controller Services
if (triggerComponentShutdown) {
- allControllerServices.forEach(cs ->
processScheduler.shutdownControllerService(cs, controllerServiceProvider));
+ if (manageControllerServices) {
+ allControllerServices.forEach(cs ->
processScheduler.shutdownControllerService(cs, controllerServiceProvider));
+ }
// invoke any methods annotated with @OnShutdown on Reporting Tasks
reportingTasks.forEach(processScheduler::shutdownReportingTask);
@@ -430,6 +466,51 @@ public class StandardStatelessFlow implements
StatelessDataflow {
logger.info("Finished shutting down dataflow");
}
+ private void unscheduleProcessors(final Set<ProcessorNode>
runningProcessors) {
+ for (final ProcessorNode processor : runningProcessors) {
+ if (!processor.isRunning()) {
+ continue;
+ }
+
+ final ProcessContext processContext =
processContextFactory.createProcessContext(processor);
+ processor.triggerOnUnscheduled(processContext);
+ }
+ }
+
+ private boolean waitForProcessorThreadsToComplete(final Set<ProcessorNode>
runningProcessors, final Duration gracefulShutdownDuration) {
+ final long maxEndTime = System.currentTimeMillis() +
gracefulShutdownDuration.toMillis();
+
+ // While there are still elements in the runningProcessors set, we
will continue to check if they have stopped.
+ while (!runningProcessors.isEmpty()) {
+ // Get a list of all stopped processors so that we can remove
them. We must convert to a List and then remove
+ // in order to avoid a ConcurrentModificationException.
+ final List<ProcessorNode> stopped = runningProcessors.stream()
+ .filter(proc -> proc.getActiveThreadCount() == 0)
+ .toList();
+
+ stopped.forEach(runningProcessors::remove);
+
+ // If there are no running processors left, then we can return
true. Otherwise, if we've reached out max time,
+ // we return false. If neither stopping condition is met, sleep
for a bit and check again.
+ if (runningProcessors.isEmpty()) {
+ return true;
+ }
+
+ if (System.currentTimeMillis() > maxEndTime) {
+ return false;
+ }
+
+ try {
+ Thread.sleep(10);
+ } catch (final InterruptedException e) {
+ Thread.interrupted();
+ return false;
+ }
+ }
+
+ return true;
+ }
+
@Override
public StatelessDataflowValidation performValidation() {
final Map<ComponentNode, List<ValidationResult>> resultsMap = new
HashMap<>();
@@ -465,15 +546,26 @@ public class StandardStatelessFlow implements
StatelessDataflow {
private void enableControllerServices(final ProcessGroup processGroup) {
final Set<ControllerServiceNode> services =
processGroup.getControllerServices(false);
- for (final ControllerServiceNode serviceNode : services) {
- final Future<?> future =
controllerServiceProvider.enableControllerServiceAndDependencies(serviceNode);
+ final Future<Void> future =
controllerServiceProvider.enableControllerServicesAsync(services);
- try {
- future.get(this.componentEnableTimeoutMillis,
TimeUnit.MILLISECONDS);
- } catch (final Exception e) {
- throw new IllegalStateException("Controller Service " +
serviceNode + " has not fully enabled. Current Validation Status is "
- + serviceNode.getValidationStatus() + " with validation
Errors: " + serviceNode.getValidationErrors(), e);
+ try {
+ future.get(this.componentEnableTimeoutMillis,
TimeUnit.MILLISECONDS);
+ } catch (final Exception e) {
+ final StringBuilder validationMessage = new StringBuilder("The
following Controller Services have not fully enabled:\n");
+ for (final ControllerServiceNode serviceNode : services) {
+ if (serviceNode.getState() == ControllerServiceState.ENABLED) {
+ continue;
+ }
+
+ validationMessage.append("Controller Service
").append(serviceNode)
+ .append(" has Validation Status ")
+ .append(serviceNode.getValidationStatus())
+ .append(" with validation Errors: ")
+ .append(serviceNode.getValidationErrors())
+ .append("\n");
}
+
+ throw new
IllegalStateException(validationMessage.toString().trim(), e);
}
processGroup.getProcessGroups().forEach(this::enableControllerServices);
@@ -609,17 +701,6 @@ public class StandardStatelessFlow implements
StatelessDataflow {
}
}
- @Override
- public boolean isStateful() {
- if (stateful == null) {
- final boolean hasStatefulReportingTask =
reportingTasks.stream().anyMatch(this::isStateful);
- if (hasStatefulReportingTask) {
- return true;
- }
- stateful = isStateful(rootGroup);
- }
- return stateful;
- }
private boolean isStateful(final ProcessGroup processGroup) {
final boolean hasStatefulProcessor =
processGroup.getProcessors().stream().anyMatch(this::isStateful);
@@ -783,63 +864,38 @@ public class StandardStatelessFlow implements
StatelessDataflow {
}
@Override
- public void setComponentStates(final Map<String, String> componentStates,
final Scope scope) {
- final Map<String, StateMap> stateMaps =
deserializeStateMaps(componentStates);
- stateManagerProvider.updateComponentsStates(stateMaps, scope);
- }
-
- private Map<String, StateMap> deserializeStateMaps(final Map<String,
String> componentStates) {
- if (componentStates == null) {
- return Collections.emptyMap();
- }
-
- final Map<String, StateMap> deserializedStateMaps = new HashMap<>();
-
- for (final Map.Entry<String, String> entry :
componentStates.entrySet()) {
- final String componentId = entry.getKey();
- final String serialized = entry.getValue();
-
- final SerializableStateMap deserialized;
- try {
- deserialized = objectMapper.readValue(serialized,
SerializableStateMap.class);
- } catch (final Exception e) {
- // We want to avoid throwing an Exception here because if we
do, we may never be able to run the flow again, at least not without
- // destroying all state that exists for the component. Would
be better to simply skip the state for this component
- logger.error("Failed to deserialized components' state for
component with ID {}. State will be reset to empty", componentId, e);
- continue;
- }
-
- final StateMap stateMap = new
StandardStateMap(deserialized.getStateValues(),
Optional.ofNullable(deserialized.getVersion()));
- deserializedStateMaps.put(componentId, stateMap);
- }
-
- return deserializedStateMaps;
+ public BulletinRepository getBulletinRepository() {
+ return bulletinRepository;
}
@Override
- public boolean isSourcePrimaryNodeOnly() {
- for (final Connectable connectable : rootConnectables) {
- if (connectable.isIsolated()) {
- return true;
- }
- }
-
- return false;
+ public OptionalLong getCounter(final String componentId, final String
counterName) {
+ final String instanceId = findInstanceId(componentId);
+ return findCounter(counter -> counter.getContext().endsWith(" (" +
instanceId + ")") && counter.getName().equals(counterName));
}
@Override
- public long getSourceYieldExpiration() {
- long latest = 0L;
- for (final Connectable connectable : rootConnectables) {
- latest = Math.max(latest, connectable.getYieldExpiration());
- }
-
- return latest;
+ public Map<String, Long> getCounters(final Pattern counterNamePattern) {
+ final CounterRepository counterRepository =
repositoryContextFactory.getCounterRepository();
+ return counterRepository.getCounters().stream()
+ .filter(counter -> !counter.getContext().startsWith("All ") &&
counterNamePattern.matcher(counter.getName()).matches())
+ .collect(Collectors.toMap(Counter::getName, Counter::getValue));
}
- @Override
- public BulletinRepository getBulletinRepository() {
- return bulletinRepository;
+ private String findInstanceId(final String componentId) {
+ return rootGroup.findAllProcessors().stream()
+ .filter(processor -> Objects.equals(processor.getIdentifier(),
componentId) || Objects.equals(processor.getVersionedComponentId().orElse(""),
componentId))
+ .findFirst()
+ .map(ProcessorNode::getIdentifier)
+ .orElse(null);
+ }
+
+ private OptionalLong findCounter(final Predicate<Counter> predicate) {
+ final CounterRepository counterRepository =
repositoryContextFactory.getCounterRepository();
+ return counterRepository.getCounters().stream()
+ .filter(predicate)
+ .mapToLong(Counter::getValue)
+ .findFirst();
}
@SuppressWarnings("unused")
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/RepositoryContextFactory.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/RepositoryContextFactory.java
index 5643fb6d95..f073562802 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/RepositoryContextFactory.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/RepositoryContextFactory.java
@@ -19,6 +19,7 @@ package org.apache.nifi.stateless.repository;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.CounterRepository;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.RepositoryContext;
@@ -33,5 +34,7 @@ public interface RepositoryContextFactory {
FlowFileEventRepository getFlowFileEventRepository();
+ CounterRepository getCounterRepository();
+
void shutdown();
}
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContextFactory.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContextFactory.java
index 49cc0e3353..b034d0f82d 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContextFactory.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContextFactory.java
@@ -42,7 +42,7 @@ public class StatelessRepositoryContextFactory implements
RepositoryContextFacto
private final StateManagerProvider stateManagerProvider;
public StatelessRepositoryContextFactory(final ContentRepository
contentRepository, final FlowFileRepository flowFileRepository, final
FlowFileEventRepository flowFileEventRepository,
- final CounterRepository
counterRepository, final ProvenanceEventRepository provenanceRepository, final
StateManagerProvider stateManagerProvider) {
+ final CounterRepository
counterRepository, final StateManagerProvider stateManagerProvider) {
this.contentRepository = contentRepository;
this.flowFileRepository = flowFileRepository;
this.flowFileEventRepository = flowFileEventRepository;
@@ -71,6 +71,11 @@ public class StatelessRepositoryContextFactory implements
RepositoryContextFacto
return flowFileEventRepository;
}
+ @Override
+ public CounterRepository getCounterRepository() {
+ return counterRepository;
+ }
+
@Override
public void shutdown() {
contentRepository.shutdown();
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/StatelessProcessSession.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/StatelessProcessSession.java
index 4f78639b83..7695043f36 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/StatelessProcessSession.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/StatelessProcessSession.java
@@ -17,9 +17,11 @@
package org.apache.nifi.stateless.session;
+import org.apache.nifi.components.PortFunction;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.StandardProcessSession;
import org.apache.nifi.controller.repository.metrics.NopPerformanceTracker;
@@ -92,8 +94,8 @@ public class StatelessProcessSession extends
StandardProcessSession {
// If we don't require synchronous commits, we can trigger the async
commit, but we can't call the callback yet, because we only can call the
success callback when we've completed the
// dataflow in order to ensure that we don't destroy data in a way
that it can't be replayed if the downstream processors fail.
if (!requireSynchronousCommits) {
- super.commitAsync();
tracker.addCallback(connectable, onSuccess, onFailure, this);
+ super.commitAsync();
return;
}
@@ -232,7 +234,7 @@ public class StatelessProcessSession extends
StandardProcessSession {
return false;
}
- if (executionProgress.isFailurePort(connectable.getName())) {
+ if (connectable instanceof Port && ((Port)
connectable).getPortFunction() == PortFunction.FAILURE) {
return true;
}
diff --git
a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java
b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java
index 8bc146d089..d735b02483 100644
---
a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java
+++
b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java
@@ -43,6 +43,7 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -67,7 +68,7 @@ public class StatelessSystemIT {
@AfterEach
public void shutdownFlows() {
- createdFlows.forEach(StatelessDataflow::shutdown);
+ createdFlows.forEach(dataflow -> dataflow.shutdown(true, true,
Duration.ofSeconds(1)));
}
protected StatelessEngineConfiguration getEngineConfiguration() {
diff --git
a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/ProcessorLifecycleIT.java
b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/ProcessorLifecycleIT.java
index 6eb246a384..fe62588992 100644
---
a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/ProcessorLifecycleIT.java
+++
b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/ProcessorLifecycleIT.java
@@ -17,6 +17,8 @@
package org.apache.nifi.stateless.basics;
+import org.apache.nifi.components.PortFunction;
+import org.apache.nifi.flow.VersionedPort;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.stateless.StatelessSystemIT;
import org.apache.nifi.stateless.VersionedFlowBuilder;
@@ -25,19 +27,35 @@ import org.apache.nifi.stateless.flow.DataflowTrigger;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import org.apache.nifi.stateless.flow.TriggerResult;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class ProcessorLifecycleIT extends StatelessSystemIT {
+ public static final String STOPPED_AND_SUCCESS = "Stopped and Success";
+ public static final String STOPPED_AND_FAILURE = "Stopped and Failure";
+ public static final String RUNNING_AND_SUCCESS = "Running and Success";
+ public static final String RUNNING_AND_FAILURE = "Running and Failure";
+
private static final Logger logger =
LoggerFactory.getLogger(ProcessorLifecycleIT.class);
@Test
@@ -53,7 +71,7 @@ public class ProcessorLifecycleIT extends StatelessSystemIT {
flowBuilder.createConnection(generate, writeLifecycleEvents,
"success");
writeLifecycleEvents.setAutoTerminatedRelationships(Collections.singleton("success"));
- writeLifecycleEvents.setProperties(Collections.singletonMap("Event
File", eventsFile.getAbsolutePath()));
+ writeLifecycleEvents.setProperties(Map.of("Event File",
eventsFile.getAbsolutePath()));
final StatelessDataflow dataflow =
loadDataflow(flowBuilder.getFlowSnapshot(), Collections.emptyList());
@@ -75,4 +93,156 @@ public class ProcessorLifecycleIT extends StatelessSystemIT
{
assertEquals(Arrays.asList("OnScheduled", "OnTrigger",
"OnUnscheduled", "OnStopped"), events);
}
+
+ @Test
+ @Timeout(value = 30, unit = TimeUnit.SECONDS)
+ public void testSessionCommitCallbacksCalledBeforeStopOnFailure() throws
StatelessConfigurationException, IOException, InterruptedException {
+ final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder();
+
+ final VersionedProcessor generate =
flowBuilder.createSimpleProcessor("GenerateAndCountCallbacks");
+ final VersionedPort port = flowBuilder.createOutputPort("Out");
+ port.setPortFunction(PortFunction.FAILURE);
+
+ flowBuilder.createConnection(generate, port, "success");
+ final StatelessDataflow dataflow =
loadDataflow(flowBuilder.getFlowSnapshot(), Collections.emptyList());
+ final DataflowTrigger trigger = dataflow.trigger();
+ final TriggerResult result = trigger.getResult();
+
+ assertFalse(result.isSuccessful());
+
+ assertCounters(dataflow, generate.getIdentifier(),
RUNNING_AND_FAILURE);
+ }
+
+ private void assertCounters(final StatelessDataflow dataflow, final String
componentId, final String positiveCounterName) {
+ for (final String counterName : new String[] {RUNNING_AND_SUCCESS,
RUNNING_AND_FAILURE, STOPPED_AND_SUCCESS, STOPPED_AND_FAILURE}) {
+ final OptionalLong counterValue = dataflow.getCounter(componentId,
counterName);
+
+ if (Objects.equals(counterName, positiveCounterName)) {
+ assertTrue(counterValue.isPresent(), "Expected Counter '" +
counterName + "' to be present but it was not. Counters: " +
dataflow.getCounters(Pattern.compile(".+")));
+ assertEquals(1, counterValue.getAsLong());
+ } else {
+ assertTrue(counterValue.isEmpty(), "Counter '" + counterName +
"' has a value of " + counterValue);
+ }
+ }
+ }
+
+ @Test
+ @Timeout(value = 30, unit = TimeUnit.SECONDS)
+ public void testSessionCommitCallbacksCalledBeforeStopOnTimeout() throws
StatelessConfigurationException, IOException, InterruptedException {
+ final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder();
+
+ final VersionedProcessor generate =
flowBuilder.createSimpleProcessor("GenerateAndCountCallbacks");
+ final VersionedProcessor sleep =
flowBuilder.createSimpleProcessor("Sleep");
+ sleep.setProperties(Map.of("onTrigger Sleep Time", "1 min"));
+ sleep.setAutoTerminatedRelationships(Set.of("success"));
+
+ flowBuilder.createConnection(generate, sleep, "success");
+ final StatelessDataflow dataflow =
loadDataflow(flowBuilder.getFlowSnapshot(), Collections.emptyList());
+ final DataflowTrigger trigger = dataflow.trigger();
+ final Optional<TriggerResult> optionalTriggerResult =
trigger.getResult(100, TimeUnit.MILLISECONDS);
+ assertTrue(optionalTriggerResult.isEmpty());
+ trigger.cancel();
+
+ // Give it 2 seconds to finish its processing and make sure that we
see no counters incremented.
+ // We expect no counters to be incremented because, even though the we
should see an attempt to increment
+ // the counter, the ProcessSession should have been terminated,
disallowing the call to ProcessSession.adjustCounter()
+ Thread.sleep(2000L);
+ final Map<String, Long> counters =
dataflow.getCounters(Pattern.compile(".+"));
+ assertTrue(counters.isEmpty(), "Expected no counters to be incremented
but found: " + counters);
+ }
+
+ @Test
+ @Timeout(value = 30, unit = TimeUnit.SECONDS)
+ public void testSessionCommitCallbacksCalledBeforeStopOnSuccess() throws
StatelessConfigurationException, IOException, InterruptedException {
+ final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder();
+
+ final VersionedProcessor generate =
flowBuilder.createSimpleProcessor("GenerateAndCountCallbacks");
+ final VersionedPort port = flowBuilder.createOutputPort("Out");
+ port.setPortFunction(PortFunction.STANDARD); // Being explicit that
port should not be a failure port.
+
+ flowBuilder.createConnection(generate, port, "success");
+ final StatelessDataflow dataflow =
loadDataflow(flowBuilder.getFlowSnapshot(), Collections.emptyList());
+ final DataflowTrigger trigger = dataflow.trigger();
+ final TriggerResult result = trigger.getResult();
+
+ assertTrue(result.isSuccessful());
+ result.acknowledge();
+
+ assertCounters(dataflow, generate.getIdentifier(),
RUNNING_AND_SUCCESS);
+ }
+
+ @Test
+ @Timeout(value = 30, unit = TimeUnit.SECONDS)
+ public void
testSessionCommitCallbacksCalledBeforeStopOnShutdownWhenProcessorDoesNotGracefullyFinish()
throws StatelessConfigurationException, IOException, InterruptedException {
+ final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder();
+
+ final VersionedProcessor generate =
flowBuilder.createSimpleProcessor("GenerateAndCountCallbacks");
+ final VersionedProcessor sleep =
flowBuilder.createSimpleProcessor("Sleep");
+ sleep.setProperties(Map.of("onTrigger Sleep Time", "1 min", "Stop
Sleeping When Unscheduled", "false"));
+ sleep.setAutoTerminatedRelationships(Set.of("success"));
+
+ flowBuilder.createConnection(generate, sleep, "success");
+ final StatelessDataflow dataflow =
loadDataflow(flowBuilder.getFlowSnapshot(), Collections.emptyList());
+ final DataflowTrigger trigger = dataflow.trigger();
+ final Optional<TriggerResult> optionalTriggerResult =
trigger.getResult(100, TimeUnit.MILLISECONDS);
+ assertTrue(optionalTriggerResult.isEmpty());
+ dataflow.shutdown(false, true, Duration.ofMillis(1));
+
+ while (dataflow.getCounters(Pattern.compile(".+")).isEmpty()) {
+ Thread.sleep(100L);
+ }
+
+ assertCounters(dataflow, generate.getIdentifier(),
RUNNING_AND_FAILURE);
+ }
+
+ @Test
+ @Timeout(value = 30, unit = TimeUnit.SECONDS)
+ public void
testSessionCommitCallbacksCalledBeforeStopOnShutdownWhenProcessorFinishes()
throws StatelessConfigurationException, IOException, InterruptedException {
+ final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder();
+
+ final VersionedProcessor generate =
flowBuilder.createSimpleProcessor("GenerateAndCountCallbacks");
+ final VersionedProcessor sleep =
flowBuilder.createSimpleProcessor("Sleep");
+ sleep.setProperties(Map.of("onTrigger Sleep Time", "1 min", "Stop
Sleeping When Unscheduled", "true"));
+ sleep.setAutoTerminatedRelationships(Set.of("success"));
+
+ flowBuilder.createConnection(generate, sleep, "success");
+ final StatelessDataflow dataflow =
loadDataflow(flowBuilder.getFlowSnapshot(), Collections.emptyList());
+ final DataflowTrigger trigger = dataflow.trigger();
+ final Optional<TriggerResult> optionalTriggerResult =
trigger.getResult(100, TimeUnit.MILLISECONDS);
+ assertTrue(optionalTriggerResult.isEmpty());
+ dataflow.shutdown(false, true, Duration.ofSeconds(5));
+
+ while (dataflow.getCounters(Pattern.compile(".+")).isEmpty()) {
+ Thread.sleep(100L);
+ }
+
+ assertCounters(dataflow, generate.getIdentifier(),
RUNNING_AND_SUCCESS);
+ }
+
+ @Test
+ @Timeout(value = 30, unit = TimeUnit.SECONDS)
+ public void
testSessionCommitCallbacksCalledBeforeStopOnShutdownWithTimeout() throws
StatelessConfigurationException, IOException, InterruptedException {
+ final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder();
+
+ final VersionedProcessor generate =
flowBuilder.createSimpleProcessor("GenerateAndCountCallbacks");
+ final VersionedProcessor sleep =
flowBuilder.createSimpleProcessor("Sleep");
+ sleep.setProperties(Map.of("onTrigger Sleep Time", "1 min",
+ "Ignore Interrupts", "true",
+ "Stop Sleeping When Unscheduled", "false"));
+ sleep.setAutoTerminatedRelationships(Set.of("success"));
+
+ flowBuilder.createConnection(generate, sleep, "success");
+ final StatelessDataflow dataflow =
loadDataflow(flowBuilder.getFlowSnapshot(), Collections.emptyList());
+ final DataflowTrigger trigger = dataflow.trigger();
+ final Optional<TriggerResult> optionalTriggerResult =
trigger.getResult(2, TimeUnit.SECONDS);
+ assertTrue(optionalTriggerResult.isEmpty());
+ dataflow.shutdown(false, true, Duration.ofMillis(1));
+
+ while (dataflow.getCounters(Pattern.compile(".+")).isEmpty()) {
+ Thread.sleep(100L);
+ }
+
+ assertCounters(dataflow, generate.getIdentifier(),
RUNNING_AND_FAILURE);
+ }
+
}
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateAndCountCallbacks.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateAndCountCallbacks.java
new file mode 100644
index 0000000000..9c8f7eb30a
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateAndCountCallbacks.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.time.Duration;
+import java.util.Set;
+
+@CapabilityDescription("Generates an empty FlowFile and counts how many times
the session commit callback is called " +
+ "for both success and failure before and after
Processor is stopped")
+public class GenerateAndCountCallbacks extends AbstractProcessor {
+
+ public static final String STOPPED_AND_SUCCESS = "Stopped and Success";
+ public static final String STOPPED_AND_FAILURE = "Stopped and Failure";
+ public static final String RUNNING_AND_SUCCESS = "Running and Success";
+ public static final String RUNNING_AND_FAILURE = "Running and Failure";
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All FlowFiles are routed to this relationship")
+ .build();
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return Set.of(REL_SUCCESS);
+ }
+
+ private volatile boolean stopped = false;
+
+ @OnScheduled
+ public void onScheduled() {
+ stopped = false;
+ getLogger().info("Processor started");
+ }
+
+ @OnStopped
+ public void onStopped() {
+ stopped = true;
+ getLogger().info("Processor stopped");
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ FlowFile flowFile = session.create();
+ session.transfer(flowFile, REL_SUCCESS);
+
+ session.commitAsync(() -> {
+ sleepUninterruptibly(Duration.ofSeconds(3));
+
+ if (stopped) {
+ session.adjustCounter(STOPPED_AND_SUCCESS, 1, true);
+ getLogger().error("Success callback called after Processor
Stopped");
+ } else {
+ session.adjustCounter(RUNNING_AND_SUCCESS, 1, true);
+ getLogger().info("Success callback called while Processor
Running");
+ }
+ },
+ failureCause -> {
+ sleepUninterruptibly(Duration.ofSeconds(3));
+
+ if (stopped) {
+ session.adjustCounter(STOPPED_AND_FAILURE, 1, true);
+ getLogger().error("Failure callback called after Processor
Stopped; Failure cause: {}", failureCause.toString());
+ } else {
+ session.adjustCounter(RUNNING_AND_FAILURE, 1, true);
+ getLogger().warn("Failure callback called while Processor
Running; Failure cause: {}", failureCause.toString());
+ }
+ });
+ }
+
+ private void sleepUninterruptibly(final Duration duration) {
+ long endTime = System.currentTimeMillis() + duration.toMillis();
+ while (System.currentTimeMillis() < endTime) {
+ try {
+ Thread.sleep(endTime - System.currentTimeMillis());
+ } catch (final InterruptedException ignored) {
+ // Ignore interruption and continue sleeping until the
specified duration has elapsed
+ }
+ }
+ }
+}
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/Sleep.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/Sleep.java
index 4edbd945e0..03427938d9 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/Sleep.java
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/Sleep.java
@@ -22,6 +22,7 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.cs.tests.system.SleepService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
@@ -31,7 +32,6 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -73,6 +73,32 @@ public class Sleep extends AbstractProcessor {
.required(false)
.identifiesControllerService(SleepService.class)
.build();
+ static final PropertyDescriptor IGNORE_INTERRUPTS = new Builder()
+ .name("Ignore Interrupts")
+ .description("If true, the processor will not respond to interrupts
while sleeping.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .build();
+ static final PropertyDescriptor STOP_SLEEPING_WHEN_UNSCHEDULED = new
Builder()
+ .name("Stop Sleeping When Unscheduled")
+ .description("If true, the processor will stop sleeping whenever the
processor is unscheduled. " +
+ "If false, the processor will continue sleeping until the sleep
time has elapsed. This property only applies to the " +
+ "onTrigger Sleep Time.")
+ .required(false)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .build();
+
+ private static final List<PropertyDescriptor> properties = List.of(
+ VALIDATE_SLEEP_TIME,
+ ON_SCHEDULED_SLEEP_TIME,
+ ON_TRIGGER_SLEEP_TIME,
+ ON_STOPPED_SLEEP_TIME,
+ SLEEP_SERVICE,
+ IGNORE_INTERRUPTS,
+ STOP_SLEEPING_WHEN_UNSCHEDULED
+ );
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@@ -80,46 +106,61 @@ public class Sleep extends AbstractProcessor {
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- final List<PropertyDescriptor> properties = new ArrayList<>();
- properties.add(VALIDATE_SLEEP_TIME);
- properties.add(ON_SCHEDULED_SLEEP_TIME);
- properties.add(ON_TRIGGER_SLEEP_TIME);
- properties.add(ON_STOPPED_SLEEP_TIME);
- properties.add(SLEEP_SERVICE);
return properties;
}
@Override
public Set<Relationship> getRelationships() {
- return Collections.singleton(REL_SUCCESS);
+ return Set.of(REL_SUCCESS);
}
@Override
protected Collection<ValidationResult> customValidate(final
ValidationContext validationContext) {
final long sleepMillis =
validationContext.getProperty(VALIDATE_SLEEP_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
- sleep(sleepMillis);
+ sleep(sleepMillis, isIgnoreInterrupt(validationContext), false);
return Collections.emptyList();
}
- private void sleep(final long millis) {
- if (millis > 0L) {
+ private void sleep(final long millis, final boolean ignoreInterrupts,
final boolean stopSleepOnUnscheduled) {
+ if (millis == 0L) {
+ return;
+ }
+
+ final long stopTime = System.currentTimeMillis() + millis;
+ while (System.currentTimeMillis() < stopTime) {
+ if (stopSleepOnUnscheduled && !isScheduled()) {
+ return;
+ }
+
+ // Sleep for up to the stopTime but no more than 50 milliseconds
at a time. This gives us a chance
+ // to periodically check if the processor is still scheduled
+ final long sleepMillis = Math.min(stopTime -
System.currentTimeMillis(), 50L);
try {
- Thread.sleep(millis);
- } catch (final InterruptedException ie) {
+ Thread.sleep(sleepMillis);
+ } catch (final InterruptedException e) {
+ if (ignoreInterrupts) {
+ continue;
+ }
+
Thread.currentThread().interrupt();
+ return;
}
}
}
@OnScheduled
public void onEnabled(final ProcessContext context) {
-
sleep(context.getProperty(ON_SCHEDULED_SLEEP_TIME).asTimePeriod(TimeUnit.MILLISECONDS));
+
sleep(context.getProperty(ON_SCHEDULED_SLEEP_TIME).asTimePeriod(TimeUnit.MILLISECONDS),
isIgnoreInterrupt(context), false);
+ }
+
+ private boolean isIgnoreInterrupt(final PropertyContext context) {
+ return context.getProperty(IGNORE_INTERRUPTS).asBoolean();
}
@OnStopped
public void onDisabled(final ProcessContext context) {
-
sleep(context.getProperty(ON_STOPPED_SLEEP_TIME).asTimePeriod(TimeUnit.MILLISECONDS));
+
sleep(context.getProperty(ON_STOPPED_SLEEP_TIME).asTimePeriod(TimeUnit.MILLISECONDS),
isIgnoreInterrupt(context), false);
}
@@ -131,13 +172,16 @@ public class Sleep extends AbstractProcessor {
}
final long sleepMillis =
context.getProperty(ON_TRIGGER_SLEEP_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
- sleep(sleepMillis);
+ final boolean ignoreInterrupts = isIgnoreInterrupt(context);
+ final boolean stopSleepOnUnscheduled =
context.getProperty(STOP_SLEEPING_WHEN_UNSCHEDULED).asBoolean();
+ sleep(sleepMillis, ignoreInterrupts, stopSleepOnUnscheduled);
final SleepService service =
context.getProperty(SLEEP_SERVICE).asControllerService(SleepService.class);
if (service != null) {
service.sleep();
}
+ getLogger().info("Finished onTrigger sleep");
session.transfer(flowFile, REL_SUCCESS);
}
}
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 91c2062c38..9df0ad1a4d 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -27,6 +27,7 @@
org.apache.nifi.processors.tests.system.EnsureProcessorConfigurationCorrect
org.apache.nifi.processors.tests.system.EvaluatePropertiesWithDifferentELScopes
org.apache.nifi.processors.tests.system.FakeProcessor
org.apache.nifi.processors.tests.system.FakeDynamicPropertiesProcessor
+org.apache.nifi.processors.tests.system.GenerateAndCountCallbacks
org.apache.nifi.processors.tests.system.GenerateFlowFile
org.apache.nifi.processors.tests.system.HoldInput
org.apache.nifi.processors.tests.system.IngestFile
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/stateless/StatelessBasicsIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/stateless/StatelessBasicsIT.java
index 77020c914b..780438ca2c 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/stateless/StatelessBasicsIT.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/stateless/StatelessBasicsIT.java
@@ -533,7 +533,7 @@ public class StatelessBasicsIT extends NiFiSystemIT {
@Test
public void testStopGroupMakesFlowFileAvailable() throws
NiFiClientException, IOException, InterruptedException {
- createFlowShell();
+ createFlowShell("3 sec");
// Add a sleep for 1 min
final ProcessorEntity sleep = getClientUtil().createProcessor("Sleep",
statelessGroup.getId());