This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 4210e30 NIFI-9620: Adding isStateful to StatelessDataflow (#5703)
4210e30 is described below
commit 4210e30047b7d7db17375caaba70d3465450d423
Author: Joe Gresock <[email protected]>
AuthorDate: Wed Feb 16 13:30:40 2022 -0500
NIFI-9620: Adding isStateful to StatelessDataflow (#5703)
* Adding isStateless method to reporting task and controller service
---
.../apache/nifi/controller/ControllerService.java | 10 +++++
.../java/org/apache/nifi/processor/Processor.java | 12 ++++++
.../org/apache/nifi/reporting/ReportingTask.java | 10 +++++
.../processors/attributes/UpdateAttribute.java | 5 +++
.../nifi/stateless/flow/StatelessDataflow.java | 6 +++
.../stateless/bootstrap/StatelessBootstrap.java | 8 +++-
.../nifi/stateless/flow/StandardStatelessFlow.java | 48 ++++++++++++++++++++++
7 files changed, 97 insertions(+), 2 deletions(-)
diff --git
a/nifi-api/src/main/java/org/apache/nifi/controller/ControllerService.java
b/nifi-api/src/main/java/org/apache/nifi/controller/ControllerService.java
index a77c69d..1baab99 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/ControllerService.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/ControllerService.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.controller;
+import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
@@ -177,4 +178,13 @@ public interface ControllerService extends
ConfigurableComponent {
*/
void initialize(ControllerServiceInitializationContext context) throws
InitializationException;
+ /**
+ * Indicates whether this controller service, configured with the given
{@link ConfigurationContext}, stores state.
+ * @param context provides access to convenience methods for obtaining
property values
+ * @return True if this controller service stores state
+ */
+ default boolean isStateful(ConfigurationContext context) {
+ return this.getClass().isAnnotationPresent(Stateful.class);
+ }
+
}
diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/Processor.java
b/nifi-api/src/main/java/org/apache/nifi/processor/Processor.java
index 145ed9d..74f973d 100644
--- a/nifi-api/src/main/java/org/apache/nifi/processor/Processor.java
+++ b/nifi-api/src/main/java/org/apache/nifi/processor/Processor.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.processor;
+import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.processor.exception.ProcessException;
@@ -87,4 +88,15 @@ public interface Processor extends ConfigurableComponent {
* indicate a probable coding defect.
*/
void onTrigger(ProcessContext context, ProcessSessionFactory
sessionFactory) throws ProcessException;
+
+ /**
+ * Indicates whether this processor, configured with the given {@link
ProcessContext}, stores state.
+ * @param context provides access to convenience methods for obtaining
+ * property values, delaying the scheduling of the processor, provides
+ * access to Controller Services, etc.
+ * @return True if this processor stores state
+ */
+ default boolean isStateful(ProcessContext context) {
+ return this.getClass().isAnnotationPresent(Stateful.class);
+ }
}
diff --git
a/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingTask.java
b/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingTask.java
index 780b79e..b836cdf 100644
--- a/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingTask.java
+++ b/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingTask.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.reporting;
+import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
import org.apache.nifi.components.ConfigurableComponent;
@@ -72,4 +73,13 @@ public interface ReportingTask extends ConfigurableComponent
{
* @param context reporting context
*/
void onTrigger(ReportingContext context);
+
+ /**
+ * Indicates whether this reporting task, configured with the given {@link
ReportingContext}, stores state.
+ * @param context provides access to convenience methods for obtaining
property values
+ * @return True if this reporting task stores state
+ */
+ default boolean isStateful(ReportingContext context) {
+ return this.getClass().isAnnotationPresent(Stateful.class);
+ }
}
diff --git
a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
index 314fb70..005d53a 100644
---
a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
+++
b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
@@ -226,6 +226,11 @@ public class UpdateAttribute extends AbstractProcessor
implements Searchable {
}
@Override
+ public boolean isStateful(final ProcessContext context) {
+ return
!context.getProperty(STORE_STATE).getValue().equals(DO_NOT_STORE_STATE);
+ }
+
+ @Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final
String propertyDescriptorName) {
PropertyDescriptor.Builder propertyBuilder = new
PropertyDescriptor.Builder()
.name(propertyDescriptorName)
diff --git
a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
index dbd44cc..8ba1922 100644
---
a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
+++
b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
@@ -79,6 +79,12 @@ public interface StatelessDataflow {
boolean isFlowFileQueued();
+ /**
+ *
+ * @return True if there are any processors in the dataflow with the
{@link org.apache.nifi.annotation.behavior.Stateful} annotation
+ */
+ boolean isStateful();
+
void purge();
Map<String, String> getComponentStates(Scope scope);
diff --git
a/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/StatelessBootstrap.java
b/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/StatelessBootstrap.java
index aa7dbc3..80754ad 100644
---
a/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/StatelessBootstrap.java
+++
b/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/StatelessBootstrap.java
@@ -185,6 +185,8 @@ public class StatelessBootstrap {
throw new IOException("Could not get a listing of the NAR
directory");
}
+ logger.debug("NAR directory used to find files to allow being loaded
by Stateless Extension Classloaders from parent {}: {}", parent, narDirectory);
+
final Set<URL> urls = new HashSet<>();
findClassLoaderUrls(parent, urls);
@@ -207,12 +209,14 @@ public class StatelessBootstrap {
filesAllowed.add(file.getName());
}
}
+ logger.debug("The following JAR files are proposed to be blocked from
being loaded by Stateless Extensions ClassLoaders from parent {}: {}", parent,
filesBlocked);
+ logger.debug("Of the full list above, the following JAR files will be
explicitly allowed to be loaded by Stateless Extensions ClassLoaders from
parent {}: {}", parent, filesAllowed);
classesBlocked.removeAll(classesAllowed);
filesBlocked.removeAll(filesAllowed);
+ logger.debug("The final list of JAR files blocked from being loaded by
Stateless Extensions ClassLoaders from parent {}: {}", parent, filesBlocked);
- logger.debug("Blocking the following JAR files from being loaded by
Stateless Extensions ClassLoaders from parent {}: {}", parent, filesBlocked);
- logger.debug("Blocking the following classes from being loaded by
Stateless Extension ClassLoaders from parent {}: {}", parent, classesBlocked);
+ logger.debug("The final list of classes blocked from being loaded by
Stateless Extension ClassLoaders from parent {}: {}", parent, classesBlocked);
final BlockListClassLoader blockingClassLoader = new
BlockListClassLoader(parent, classesBlocked);
return blockingClassLoader;
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
index a107a6a..ff03473 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
@@ -29,6 +29,8 @@ import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.LocalPort;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.ComponentNode;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.Counter;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
@@ -41,10 +43,12 @@ import
org.apache.nifi.controller.repository.RepositoryContext;
import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.controller.state.StandardStateMap;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Processor;
@@ -52,6 +56,7 @@ import
org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.TerminatedTaskException;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.stateless.engine.ExecutionProgress;
import org.apache.nifi.stateless.engine.ExecutionProgress.CompletionAction;
import org.apache.nifi.stateless.engine.ProcessContextFactory;
@@ -114,6 +119,7 @@ public class StandardStatelessFlow implements
StatelessDataflow {
private volatile ExecutorService runDataflowExecutor;
private volatile ScheduledExecutorService backgroundTaskExecutor;
private volatile boolean initialized = false;
+ private volatile Boolean stateful = null;
public StandardStatelessFlow(final ProcessGroup rootGroup, final
List<ReportingTaskNode> reportingTasks, final ControllerServiceProvider
controllerServiceProvider,
final ProcessContextFactory
processContextFactory, final RepositoryContextFactory repositoryContextFactory,
final DataflowDefinition<?> dataflowDefinition,
@@ -521,6 +527,48 @@ public class StandardStatelessFlow implements
StatelessDataflow {
}
}
+ @Override
+ public boolean isStateful() {
+ if (stateful == null) {
+ final boolean hasStatefulReportingTask =
reportingTasks.stream().anyMatch(this::isStateful);
+ if (hasStatefulReportingTask) {
+ return true;
+ }
+ stateful = isStateful(rootGroup);
+ }
+ return stateful;
+ }
+
+ private boolean isStateful(final ProcessGroup processGroup) {
+ final boolean hasStatefulProcessor =
processGroup.getProcessors().stream().anyMatch(this::isStateful);
+
+ if (hasStatefulProcessor) {
+ return true;
+ }
+ final boolean hasStatefulControllerService =
processGroup.getControllerServices(false).stream().anyMatch(this::isStateful);
+ if (hasStatefulControllerService) {
+ return true;
+ }
+
+ return
processGroup.getProcessGroups().stream().anyMatch(this::isStateful);
+ }
+
+ private boolean isStateful(final ProcessorNode processorNode) {
+ final Processor processor = processorNode.getProcessor();
+ final ProcessContext context =
processContextFactory.createProcessContext(processorNode);
+ return processor.isStateful(context);
+ }
+
+ private boolean isStateful(final ControllerServiceNode
controllerServiceNode) {
+ final ControllerService controllerService =
controllerServiceNode.getControllerServiceImplementation();
+ final ConfigurationContext context = new
StandardConfigurationContext(controllerServiceNode, controllerServiceProvider,
null, rootGroup.getVariableRegistry());
+ return controllerService.isStateful(context);
+ }
+
+ private boolean isStateful(final ReportingTaskNode reportingTaskNode) {
+ final ReportingTask reportingTask =
reportingTaskNode.getReportingTask();
+ return
reportingTask.isStateful(reportingTaskNode.getReportingContext());
+ }
@Override
public Set<String> getInputPortNames() {