This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 4210e30  NIFI-9620: Adding isStateful to StatelessDataflow (#5703)
4210e30 is described below

commit 4210e30047b7d7db17375caaba70d3465450d423
Author: Joe Gresock <[email protected]>
AuthorDate: Wed Feb 16 13:30:40 2022 -0500

    NIFI-9620: Adding isStateful to StatelessDataflow (#5703)
    
    * Adding isStateless method to reporting task and controller service
---
 .../apache/nifi/controller/ControllerService.java  | 10 +++++
 .../java/org/apache/nifi/processor/Processor.java  | 12 ++++++
 .../org/apache/nifi/reporting/ReportingTask.java   | 10 +++++
 .../processors/attributes/UpdateAttribute.java     |  5 +++
 .../nifi/stateless/flow/StatelessDataflow.java     |  6 +++
 .../stateless/bootstrap/StatelessBootstrap.java    |  8 +++-
 .../nifi/stateless/flow/StandardStatelessFlow.java | 48 ++++++++++++++++++++++
 7 files changed, 97 insertions(+), 2 deletions(-)

diff --git 
a/nifi-api/src/main/java/org/apache/nifi/controller/ControllerService.java 
b/nifi-api/src/main/java/org/apache/nifi/controller/ControllerService.java
index a77c69d..1baab99 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/ControllerService.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/ControllerService.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.controller;
 
+import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.components.ConfigurableComponent;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
@@ -177,4 +178,13 @@ public interface ControllerService extends 
ConfigurableComponent {
      */
     void initialize(ControllerServiceInitializationContext context) throws 
InitializationException;
 
+    /**
+     * Indicates whether this controller service, configured with the given 
{@link ConfigurationContext}, stores state.
+     * @param context provides access to convenience methods for obtaining 
property values
+     * @return True if this controller service stores state
+     */
+    default boolean isStateful(ConfigurationContext context) {
+        return this.getClass().isAnnotationPresent(Stateful.class);
+    }
+
 }
diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/Processor.java 
b/nifi-api/src/main/java/org/apache/nifi/processor/Processor.java
index 145ed9d..74f973d 100644
--- a/nifi-api/src/main/java/org/apache/nifi/processor/Processor.java
+++ b/nifi-api/src/main/java/org/apache/nifi/processor/Processor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.processor;
 
+import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.components.ConfigurableComponent;
 import org.apache.nifi.processor.exception.ProcessException;
 
@@ -87,4 +88,15 @@ public interface Processor extends ConfigurableComponent {
      * indicate a probable coding defect.
      */
     void onTrigger(ProcessContext context, ProcessSessionFactory 
sessionFactory) throws ProcessException;
+
+    /**
+     * Indicates whether this processor, configured with the given {@link 
ProcessContext}, stores state.
+     * @param context provides access to convenience methods for obtaining
+     * property values, delaying the scheduling of the processor, provides
+     * access to Controller Services, etc.
+     * @return True if this processor stores state
+     */
+    default boolean isStateful(ProcessContext context) {
+        return this.getClass().isAnnotationPresent(Stateful.class);
+    }
 }
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingTask.java 
b/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingTask.java
index 780b79e..b836cdf 100644
--- a/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingTask.java
+++ b/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingTask.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.reporting;
 
+import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
 import org.apache.nifi.components.ConfigurableComponent;
 
@@ -72,4 +73,13 @@ public interface ReportingTask extends ConfigurableComponent 
{
      * @param context reporting context
      */
     void onTrigger(ReportingContext context);
+
+    /**
+     * Indicates whether this reporting task, configured with the given {@link 
ReportingContext}, stores state.
+     * @param context provides access to convenience methods for obtaining 
property values
+     * @return True if this reporting task stores state
+     */
+    default boolean isStateful(ReportingContext context) {
+        return this.getClass().isAnnotationPresent(Stateful.class);
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
 
b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
index 314fb70..005d53a 100644
--- 
a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
+++ 
b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
@@ -226,6 +226,11 @@ public class UpdateAttribute extends AbstractProcessor 
implements Searchable {
     }
 
     @Override
+    public boolean isStateful(final ProcessContext context) {
+        return 
!context.getProperty(STORE_STATE).getValue().equals(DO_NOT_STORE_STATE);
+    }
+
+    @Override
     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
         PropertyDescriptor.Builder propertyBuilder = new 
PropertyDescriptor.Builder()
                 .name(propertyDescriptorName)
diff --git 
a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
 
b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
index dbd44cc..8ba1922 100644
--- 
a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
+++ 
b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
@@ -79,6 +79,12 @@ public interface StatelessDataflow {
 
     boolean isFlowFileQueued();
 
+    /**
+     *
+     * @return True if there are any processors in the dataflow with the 
{@link org.apache.nifi.annotation.behavior.Stateful} annotation
+     */
+    boolean isStateful();
+
     void purge();
 
     Map<String, String> getComponentStates(Scope scope);
diff --git 
a/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/StatelessBootstrap.java
 
b/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/StatelessBootstrap.java
index aa7dbc3..80754ad 100644
--- 
a/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/StatelessBootstrap.java
+++ 
b/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/StatelessBootstrap.java
@@ -185,6 +185,8 @@ public class StatelessBootstrap {
             throw new IOException("Could not get a listing of the NAR 
directory");
         }
 
+        logger.debug("NAR directory used to find files to allow being loaded 
by Stateless Extension Classloaders from parent {}: {}", parent, narDirectory);
+
         final Set<URL> urls = new HashSet<>();
         findClassLoaderUrls(parent, urls);
 
@@ -207,12 +209,14 @@ public class StatelessBootstrap {
                 filesAllowed.add(file.getName());
             }
         }
+        logger.debug("The following JAR files are proposed to be blocked from 
being loaded by Stateless Extensions ClassLoaders from parent {}: {}", parent, 
filesBlocked);
+        logger.debug("Of the full list above, the following JAR files will be 
explicitly allowed to be loaded by Stateless Extensions ClassLoaders from 
parent {}: {}", parent, filesAllowed);
 
         classesBlocked.removeAll(classesAllowed);
         filesBlocked.removeAll(filesAllowed);
+        logger.debug("The final list of JAR files blocked from being loaded by 
Stateless Extensions ClassLoaders from parent {}: {}", parent, filesBlocked);
 
-        logger.debug("Blocking the following JAR files from being loaded by 
Stateless Extensions ClassLoaders from parent {}: {}", parent, filesBlocked);
-        logger.debug("Blocking the following classes from being loaded by 
Stateless Extension ClassLoaders from parent {}: {}", parent, classesBlocked);
+        logger.debug("The final list of classes blocked from being loaded by 
Stateless Extension ClassLoaders from parent {}: {}", parent, classesBlocked);
 
         final BlockListClassLoader blockingClassLoader = new 
BlockListClassLoader(parent, classesBlocked);
         return blockingClassLoader;
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
index a107a6a..ff03473 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
@@ -29,6 +29,8 @@ import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.connectable.LocalPort;
 import org.apache.nifi.connectable.Port;
 import org.apache.nifi.controller.ComponentNode;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.Counter;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ProcessorNode;
@@ -41,10 +43,12 @@ import 
org.apache.nifi.controller.repository.RepositoryContext;
 import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.StandardConfigurationContext;
 import org.apache.nifi.controller.state.StandardStateMap;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.Processor;
@@ -52,6 +56,7 @@ import 
org.apache.nifi.processor.exception.FlowFileAccessException;
 import org.apache.nifi.processor.exception.TerminatedTaskException;
 import org.apache.nifi.remote.RemoteGroupPort;
 import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ReportingTask;
 import org.apache.nifi.stateless.engine.ExecutionProgress;
 import org.apache.nifi.stateless.engine.ExecutionProgress.CompletionAction;
 import org.apache.nifi.stateless.engine.ProcessContextFactory;
@@ -114,6 +119,7 @@ public class StandardStatelessFlow implements 
StatelessDataflow {
     private volatile ExecutorService runDataflowExecutor;
     private volatile ScheduledExecutorService backgroundTaskExecutor;
     private volatile boolean initialized = false;
+    private volatile Boolean stateful = null;
 
     public StandardStatelessFlow(final ProcessGroup rootGroup, final 
List<ReportingTaskNode> reportingTasks, final ControllerServiceProvider 
controllerServiceProvider,
                                  final ProcessContextFactory 
processContextFactory, final RepositoryContextFactory repositoryContextFactory, 
final DataflowDefinition<?> dataflowDefinition,
@@ -521,6 +527,48 @@ public class StandardStatelessFlow implements 
StatelessDataflow {
         }
     }
 
+    @Override
+    public boolean isStateful() {
+        if (stateful == null) {
+            final boolean hasStatefulReportingTask = 
reportingTasks.stream().anyMatch(this::isStateful);
+            if (hasStatefulReportingTask) {
+                return true;
+            }
+            stateful = isStateful(rootGroup);
+        }
+        return stateful;
+    }
+
+    private boolean isStateful(final ProcessGroup processGroup) {
+        final boolean hasStatefulProcessor = 
processGroup.getProcessors().stream().anyMatch(this::isStateful);
+
+        if (hasStatefulProcessor) {
+            return true;
+        }
+        final boolean hasStatefulControllerService = 
processGroup.getControllerServices(false).stream().anyMatch(this::isStateful);
+        if (hasStatefulControllerService) {
+            return true;
+        }
+
+        return 
processGroup.getProcessGroups().stream().anyMatch(this::isStateful);
+    }
+
+    private boolean isStateful(final ProcessorNode processorNode) {
+        final Processor processor = processorNode.getProcessor();
+        final ProcessContext context = 
processContextFactory.createProcessContext(processorNode);
+        return processor.isStateful(context);
+    }
+
+    private boolean isStateful(final ControllerServiceNode 
controllerServiceNode) {
+        final ControllerService controllerService = 
controllerServiceNode.getControllerServiceImplementation();
+        final ConfigurationContext context = new 
StandardConfigurationContext(controllerServiceNode, controllerServiceProvider, 
null, rootGroup.getVariableRegistry());
+        return controllerService.isStateful(context);
+    }
+
+    private boolean isStateful(final ReportingTaskNode reportingTaskNode) {
+        final ReportingTask reportingTask = 
reportingTaskNode.getReportingTask();
+        return 
reportingTask.isStateful(reportingTaskNode.getReportingContext());
+    }
 
     @Override
     public Set<String> getInputPortNames() {

Reply via email to