This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new d5b626f0e4 NIFI-10108 Processor scheduling via parameter (#6115)
d5b626f0e4 is described below

commit d5b626f0e40bba7cb8fb0af14a4e344f24954c68
Author: timeabarna <[email protected]>
AuthorDate: Thu Jun 30 16:11:13 2022 +0200

    NIFI-10108 Processor scheduling via parameter (#6115)
    
    - NIFI-10108 Processor scheduling via parameter
    - Refactoring component referencing check in AbstractComponentNode and 
StandardParameterReferenceManager classes.
---
 .../org/apache/nifi/controller/Triggerable.java    |   2 +-
 .../nifi/controller/StandardProcessorNode.java     | 149 +++++++++++++++++----
 .../service/StandardControllerServiceNode.java     |   6 +
 .../StandardVersionedComponentSynchronizer.java    |   2 +-
 .../StandardParameterReferenceManager.java         |  18 +--
 .../org/apache/nifi/connectable/Connectable.java   |   2 +
 .../nifi/controller/AbstractComponentNode.java     |  35 ++++-
 .../org/apache/nifi/controller/AbstractPort.java   |   9 +-
 .../org/apache/nifi/controller/ComponentNode.java  |  11 ++
 .../org/apache/nifi/controller/StandardFunnel.java |   7 +-
 .../nifi/controller/TestAbstractComponentNode.java |   5 +
 .../nifi/controller/StandardFlowSnippet.java       |   2 +-
 .../nifi/controller/XmlFlowSynchronizer.java       |   2 +-
 .../reporting/StandardReportingTaskNode.java       |   9 ++
 .../scheduling/QuartzSchedulingAgent.java          |   3 +-
 .../scheduling/ProcessorLifecycleIT.java           |   2 +-
 .../nifi/integration/FrameworkIntegrationTest.java |   4 +-
 .../processgroup/StandardProcessGroupIT.java       |   2 +-
 .../org/apache/nifi/remote/StandardPublicPort.java |   2 +-
 .../nifi/remote/StandardRemoteGroupPort.java       |   2 +-
 .../nifi/web/dao/impl/StandardProcessorDAO.java    |  15 ++-
 .../reporting/StatelessReportingTaskNode.java      |   9 ++
 22 files changed, 234 insertions(+), 64 deletions(-)

diff --git 
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/Triggerable.java 
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/Triggerable.java
index 5255c0504e..40ba6dc67b 100644
--- 
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/Triggerable.java
+++ 
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/Triggerable.java
@@ -113,5 +113,5 @@ public interface Triggerable {
      *
      * @param schedulingPeriod to set
      */
-    void setScheduldingPeriod(String schedulingPeriod);
+    void setSchedulingPeriod(String schedulingPeriod);
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index de860a7b95..eb8386a830 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -61,8 +61,14 @@ import org.apache.nifi.logging.LogRepositoryFactory;
 import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.nar.InstanceClassLoader;
 import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.parameter.ExpressionLanguageAgnosticParameterParser;
+import org.apache.nifi.parameter.Parameter;
 import org.apache.nifi.parameter.ParameterContext;
+import org.apache.nifi.parameter.ParameterDescriptor;
 import org.apache.nifi.parameter.ParameterLookup;
+import org.apache.nifi.parameter.ParameterParser;
+import org.apache.nifi.parameter.ParameterReference;
+import org.apache.nifi.parameter.ParameterTokenList;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.Processor;
@@ -127,6 +133,8 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
     public static final String DEFAULT_YIELD_PERIOD = "1 sec";
     public static final String DEFAULT_PENALIZATION_PERIOD = "30 sec";
+    private static final String RUN_SCHEDULE = "Run Schedule";
+
     private final AtomicReference<ProcessGroup> processGroup;
     private final AtomicReference<ProcessorDetails> processorRef;
     private final AtomicReference<String> identifier;
@@ -150,6 +158,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     private volatile long yieldNanos;
     private volatile ScheduledState desiredState = ScheduledState.STOPPED;
     private volatile LogLevel bulletinLevel = LogLevel.WARN;
+    private volatile List<ParameterReference> parameterReferences = 
Collections.emptyList();
 
     private SchedulingStrategy schedulingStrategy; // guarded by synchronized 
keyword
     private ExecutionNode executionNode;
@@ -220,7 +229,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
                     LOG.error(String.format("Error while setting scheduling 
strategy from DefaultSchedule annotation: %s", ex.getMessage()), ex);
                 }
                 try {
-                    this.setScheduldingPeriod(dsc.period());
+                    this.setSchedulingPeriod(dsc.period());
                 } catch (Throwable ex) {
                     
this.setSchedulingStrategy(SchedulingStrategy.TIMER_DRIVEN);
                     LOG.error(String.format("Error while setting scheduling 
period from DefaultSchedule annotation: %s", ex.getMessage()), ex);
@@ -504,36 +513,20 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     }
 
     @Override
-    @SuppressWarnings("deprecation")
-    public synchronized void setScheduldingPeriod(final String 
schedulingPeriod) {
+    public synchronized void setSchedulingPeriod(final String 
schedulingPeriod) {
         if (isRunning()) {
             throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
         }
 
-        switch (schedulingStrategy) {
-        case CRON_DRIVEN: {
-            try {
-                new CronExpression(schedulingPeriod);
-            } catch (final Exception e) {
-                throw new IllegalArgumentException(
-                        "Scheduling Period is not a valid cron expression: " + 
schedulingPeriod);
-            }
-        }
-            break;
-        case PRIMARY_NODE_ONLY:
-        case TIMER_DRIVEN: {
-            final long schedulingNanos = 
FormatUtils.getTimeDuration(requireNonNull(schedulingPeriod),
-                    TimeUnit.NANOSECONDS);
-            if (schedulingNanos < 0) {
-                throw new IllegalArgumentException("Scheduling Period must be 
positive");
-            }
-            this.schedulingNanos.set(Math.max(MINIMUM_SCHEDULING_NANOS, 
schedulingNanos));
-        }
-            break;
-        case EVENT_DRIVEN:
-        default:
-            return;
-        }
+        //Before setting the new Configuration references, we need to remove 
the current ones from reference counts.
+        parameterReferences.forEach(parameterReference -> 
decrementReferenceCounts(parameterReference.getParameterName()));
+
+        //Setting the new Configuration references.
+        final ParameterParser parameterParser = new 
ExpressionLanguageAgnosticParameterParser();
+        final ParameterTokenList parameterTokenList = 
parameterParser.parseTokens(schedulingPeriod);
+
+        parameterReferences = new 
ArrayList<>(parameterTokenList.toReferenceList());
+        parameterReferences.forEach(parameterReference -> 
incrementReferenceCounts(parameterReference.getParameterName()));
 
         this.schedulingPeriod.set(schedulingPeriod);
     }
@@ -1197,6 +1190,98 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
         return results;
     }
 
+    @Override
+    @SuppressWarnings("deprecation")
+    public List<ValidationResult> validateConfig() {
+
+        final List<ValidationResult> results = new ArrayList<>();
+        final ParameterContext parameterContext = getParameterContext();
+
+        if (parameterContext == null && !this.parameterReferences.isEmpty()) {
+            results.add(new ValidationResult.Builder()
+                    .subject(RUN_SCHEDULE)
+                    .input("Parameter Context")
+                    .valid(false)
+                    .explanation("Processor configuration references one or 
more Parameters but no Parameter Context is currently set on the Process 
Group.")
+                    .build());
+        } else {
+            for (final ParameterReference paramRef : parameterReferences) {
+                final Optional<Parameter> parameterRef = 
parameterContext.getParameter(paramRef.getParameterName());
+                if (!parameterRef.isPresent() ) {
+                    results.add(new ValidationResult.Builder()
+                            .subject(RUN_SCHEDULE)
+                            .input(paramRef.getParameterName())
+                            .valid(false)
+                            .explanation("Processor configuration references 
Parameter '" + paramRef.getParameterName() +
+                                    "' but the currently selected Parameter 
Context does not have a Parameter with that name")
+                            .build());
+                } else {
+                    final ParameterDescriptor parameterDescriptor = 
parameterRef.get().getDescriptor();
+                    if (parameterDescriptor.isSensitive()) {
+                        results.add(new ValidationResult.Builder()
+                                .subject(RUN_SCHEDULE)
+                                .input(parameterDescriptor.getName())
+                                .valid(false)
+                                .explanation("Processor configuration cannot 
reference sensitive parameters")
+                                .build());
+                    }
+                }
+            }
+
+            final String schedulingPeriod = getSchedulingPeriod();
+            final String evaluatedSchedulingPeriod = 
evaluateParameters(schedulingPeriod);
+
+            if (evaluatedSchedulingPeriod != null) {
+                switch (schedulingStrategy) {
+                    case CRON_DRIVEN: {
+                        try {
+                            new CronExpression(evaluatedSchedulingPeriod);
+                        } catch (final Exception e) {
+                            results.add(new ValidationResult.Builder()
+                                    .subject(RUN_SCHEDULE)
+                                    .input(schedulingPeriod)
+                                    .valid(false)
+                                    .explanation("Scheduling Period is not a 
valid cron expression")
+                                    .build());
+                        }
+                    }
+                    break;
+                    case PRIMARY_NODE_ONLY:
+                    case TIMER_DRIVEN: {
+                        try {
+                            final long schedulingNanos = 
FormatUtils.getTimeDuration(requireNonNull(evaluatedSchedulingPeriod),
+                                    TimeUnit.NANOSECONDS);
+
+                            if (schedulingNanos < 0) {
+                                results.add(new ValidationResult.Builder()
+                                        .subject(RUN_SCHEDULE)
+                                        .input(schedulingPeriod)
+                                        .valid(false)
+                                        .explanation("Scheduling Period must 
be positive")
+                                        .build());
+                            }
+
+                            
this.schedulingNanos.set(Math.max(MINIMUM_SCHEDULING_NANOS, schedulingNanos));
+
+                        } catch (final Exception e) {
+                            results.add(new ValidationResult.Builder()
+                                    .subject(RUN_SCHEDULE)
+                                    .input(schedulingPeriod)
+                                    .valid(false)
+                                    .explanation("Scheduling Period is not a 
valid time duration")
+                                    .build());
+                        }
+                    }
+                    break;
+                    case EVENT_DRIVEN:
+                    default:
+                        return results;
+                }
+            }
+        }
+        return results;
+    }
+
     @Override
     public Requirement getInputRequirement() {
         return processorRef.get().getInputRequirement();
@@ -1928,6 +2013,16 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
         }
         this.maxBackoffPeriod = maxBackoffPeriod;
     }
+
+    @Override
+    public String evaluateParameters(final String value) {
+        final ParameterContext parameterContext = getParameterContext();
+        final ParameterParser parameterParser = new 
ExpressionLanguageAgnosticParameterParser();
+        final ParameterTokenList parameterTokenList = 
parameterParser.parseTokens(value);
+
+        return parameterTokenList.substitute(parameterContext);
+    }
+
     private void monitorAsyncTask(final Future<?> taskFuture, final Future<?> 
monitoringFuture, final long completionTimestamp) {
         if (taskFuture.isDone()) {
             monitoringFuture.cancel(false); // stop scheduling this task
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index d8673a2650..0c250499c7 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -32,6 +32,7 @@ import 
org.apache.nifi.components.ConfigVerificationResult.Outcome;
 import org.apache.nifi.components.ConfigurableComponent;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.validation.ValidationState;
 import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.components.validation.ValidationTrigger;
@@ -509,6 +510,11 @@ public class StandardControllerServiceNode extends 
AbstractComponentNode impleme
         return state;
     }
 
+    @Override
+    protected List<ValidationResult> validateConfig() {
+        return Collections.emptyList();
+    }
+
     /**
      * Will atomically enable this service by invoking its @OnEnabled 
operation.
      * It uses CAS operation on {@link #stateTransition} to transition this 
service
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
index bc60d45ab3..fa1a07931f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
@@ -2537,7 +2537,7 @@ public class StandardVersionedComponentSynchronizer 
implements VersionedComponen
             processor.setProperties(properties, true, 
sensitiveDynamicPropertyNames);
             processor.setRunDuration(proposed.getRunDurationMillis(), 
TimeUnit.MILLISECONDS);
             
processor.setSchedulingStrategy(SchedulingStrategy.valueOf(proposed.getSchedulingStrategy()));
-            processor.setScheduldingPeriod(proposed.getSchedulingPeriod());
+            processor.setSchedulingPeriod(proposed.getSchedulingPeriod());
             
processor.setMaxConcurrentTasks(proposed.getConcurrentlySchedulableTaskCount());
             
processor.setExecutionNode(ExecutionNode.valueOf(proposed.getExecutionNode()));
             processor.setStyle(proposed.getStyle());
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/parameter/StandardParameterReferenceManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/parameter/StandardParameterReferenceManager.java
index 5238285bc1..a05d5c57ab 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/parameter/StandardParameterReferenceManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/parameter/StandardParameterReferenceManager.java
@@ -105,7 +105,7 @@ public class StandardParameterReferenceManager implements 
ParameterReferenceMana
 
         for (final ProcessGroup group : referencingGroups) {
             for (final T componentNode : componentFunction.apply(group)) {
-                if (isComponentReferencing(componentNode, parameterName)) {
+                if (componentNode.isReferencingParameter(parameterName)) {
                     referencingComponents.add(componentNode);
                     continue;
                 }
@@ -115,22 +115,6 @@ public class StandardParameterReferenceManager implements 
ParameterReferenceMana
         return referencingComponents;
     }
 
-    private boolean isComponentReferencing(final ComponentNode componentNode, 
final String parameterName) {
-        for (final PropertyConfiguration configuration : 
componentNode.getProperties().values()) {
-            if (configuration == null) {
-                continue;
-            }
-
-            for (final ParameterReference reference : 
configuration.getParameterReferences()) {
-                if (parameterName.equals(reference.getParameterName())) {
-                    return true;
-                }
-            }
-        }
-
-        return false;
-    }
-
     private Set<ParameterReferencedControllerServiceData> 
getReferencedControllerServiceData(
         final ComponentNode componentNode,
         final String parameterName,
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java
index 6e144ca7dc..cc006217c7 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java
@@ -310,4 +310,6 @@ public interface Connectable extends Triggerable, 
ComponentAuthorizable, Positio
     String getMaxBackoffPeriod();
 
     void setMaxBackoffPeriod(String maxBackoffPeriod);
+
+    String evaluateParameters(String value);
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
index 32361f6576..c22df1610e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
@@ -754,6 +754,8 @@ public abstract class AbstractComponentNode implements 
ComponentNode {
             }
 
             final List<ValidationResult> invalidParameterResults = 
validateParameterReferences(validationContext);
+            invalidParameterResults.addAll(validateConfig());
+
             if (!invalidParameterResults.isEmpty()) {
                 // At this point, we are not able to properly resolve all 
property values, so we will not attempt to perform
                 // any further validation. Doing so would result in values 
being reported as invalid and containing confusing explanations.
@@ -793,6 +795,16 @@ public abstract class AbstractComponentNode implements 
ComponentNode {
             .build());
     }
 
+    /**
+     * Validates the current configuration, returning ValidationResults for any
+     * invalid configuration parameter.
+     *
+     * @return Collection of validation result objects for any invalid findings
+     *         only. If the collection is empty then the component is valid. 
Should guarantee
+     *         non-null
+     */
+    protected abstract List<ValidationResult> validateConfig();
+
     private List<ValidationResult> validateParameterReferences(final 
ValidationContext validationContext) {
         final List<ValidationResult> results = new ArrayList<>();
 
@@ -1040,6 +1052,15 @@ public abstract class AbstractComponentNode implements 
ComponentNode {
         // For any Property that references an updated Parameter, we need to 
call onPropertyModified().
         // Additionally, we need to trigger validation to run if this 
component is affected by the parameter update.
         boolean componentAffected = false;
+
+        //Determine if the component is affected by the Parameter Update
+        for (final String updatedParameterName : updatedParameters.keySet()) {
+            if (isReferencingParameter(updatedParameterName)) {
+                componentAffected = true;
+                break;
+            }
+        }
+
         for (final Map.Entry<PropertyDescriptor, PropertyConfiguration> entry 
: properties.entrySet()) {
             final PropertyDescriptor propertyDescriptor = entry.getKey();
             final PropertyConfiguration configuration = entry.getValue();
@@ -1051,7 +1072,6 @@ public abstract class AbstractComponentNode implements 
ComponentNode {
                 final String referencedParamName = 
reference.getParameterName();
                 if (updatedParameters.containsKey(referencedParamName)) {
                     propertyAffected = true;
-                    componentAffected = true;
                     break;
                 }
             }
@@ -1086,6 +1106,14 @@ public abstract class AbstractComponentNode implements 
ComponentNode {
         }
     }
 
+    protected void incrementReferenceCounts(final String parameterName) {
+        parameterReferenceCounts.merge(parameterName, 1, (a, b) -> a == -1 ? 
null : a + b);
+    }
+
+    protected void decrementReferenceCounts(final String parameterName) {
+        parameterReferenceCounts.merge(parameterName, -1, (a, b) -> a == 1 ? 
null : a + b);
+    }
+
     private ParameterLookup createParameterLookupForPreviousValues(final 
Map<String, ParameterUpdate> updatedParameters) {
         final ParameterContext currentContext = getParameterContext();
         return new ParameterLookup() {
@@ -1350,4 +1378,9 @@ public abstract class AbstractComponentNode implements 
ComponentNode {
     }
 
     protected abstract ParameterContext getParameterContext();
+
+    @Override
+    public boolean isReferencingParameter(final String parameterName) {
+        return parameterReferenceCounts.containsKey(parameterName);
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
index dad2b67eb6..d42bc729f1 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
@@ -465,7 +465,7 @@ public abstract class AbstractPort implements Port {
     }
 
     @Override
-    public void setScheduldingPeriod(final String schedulingPeriod) {
+    public void setSchedulingPeriod(final String schedulingPeriod) {
         final long schedulingNanos = 
FormatUtils.getTimeDuration(requireNonNull(schedulingPeriod), 
TimeUnit.NANOSECONDS);
         if (schedulingNanos < 0) {
             throw new IllegalArgumentException("Scheduling Period must be 
positive");
@@ -701,4 +701,11 @@ public abstract class AbstractPort implements Port {
     @Override
     public void setMaxBackoffPeriod(String maxBackoffPeriod) {
     }
+
+    @Override
+    public String evaluateParameters(String value) {
+        return value;
+    }
+
+
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
index 3be5204bb5..31bb6a6e81 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
@@ -73,8 +73,19 @@ public interface ComponentNode extends ComponentAuthorizable 
{
      */
     Set<String> getReferencedParameterNames();
 
+    /**
+     * Determines whether the component is referencing any parameter
+     * @return true if there is any parameter reference
+     **/
     boolean isReferencingParameter();
 
+    /**
+     * Determines whether the component is referencing the given parameter
+     * @param parameterName Parameter Name
+     * @return true if parameter is referenced
+     **/
+    boolean isReferencingParameter(final String parameterName);
+
     /**
      * Notifies the Component that the value of a parameter has changed
      * @param parameterUpdates a Map of Parameter name to a ParameterUpdate 
that describes how the Parameter changed
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
index 026ad8eb45..a636e05767 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
@@ -442,7 +442,7 @@ public class StandardFunnel implements Funnel {
     }
 
     @Override
-    public void setScheduldingPeriod(final String schedulingPeriod) {
+    public void setSchedulingPeriod(final String schedulingPeriod) {
         final long schedulingNanos = 
FormatUtils.getTimeDuration(requireNonNull(schedulingPeriod), 
TimeUnit.NANOSECONDS);
         if (schedulingNanos < 0) {
             throw new IllegalArgumentException("Scheduling Period must be 
positive");
@@ -619,6 +619,11 @@ public class StandardFunnel implements Funnel {
     public void setMaxBackoffPeriod(String maxBackoffPeriod) {
     }
 
+    @Override
+    public String evaluateParameters(String value) {
+        return value;
+    }
+
     @Override
     public Optional<String> getVersionedComponentId() {
         return Optional.ofNullable(versionedComponentId.get());
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestAbstractComponentNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestAbstractComponentNode.java
index c5ccc80860..f562516423 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestAbstractComponentNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestAbstractComponentNode.java
@@ -403,6 +403,11 @@ public class TestAbstractComponentNode {
             return null;
         }
 
+        @Override
+        protected List<ValidationResult> validateConfig() {
+            return Collections.emptyList();
+        }
+
         @Override
         public void verifyModifiable() throws IllegalStateException {
         }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java
index 1490459ac7..e7eb0fc5c4 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java
@@ -411,7 +411,7 @@ public class StandardFlowSnippet implements FlowSnippet {
 
                 // ensure that the scheduling strategy is set prior to these 
values
                 
procNode.setMaxConcurrentTasks(config.getConcurrentlySchedulableTaskCount());
-                procNode.setScheduldingPeriod(config.getSchedulingPeriod());
+                procNode.setSchedulingPeriod(config.getSchedulingPeriod());
 
                 final Set<Relationship> relationships = new HashSet<>();
                 if (processorDTO.getRelationships() != null) {
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/XmlFlowSynchronizer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/XmlFlowSynchronizer.java
index 86b5323baa..7d9ef97c04 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/XmlFlowSynchronizer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/XmlFlowSynchronizer.java
@@ -1256,7 +1256,7 @@ public class XmlFlowSynchronizer implements 
FlowSynchronizer {
 
             // must set scheduling strategy before these two
             
procNode.setMaxConcurrentTasks(config.getConcurrentlySchedulableTaskCount());
-            procNode.setScheduldingPeriod(config.getSchedulingPeriod());
+            procNode.setSchedulingPeriod(config.getSchedulingPeriod());
             if (config.getRunDurationMillis() != null) {
                 procNode.setRunDuration(config.getRunDurationMillis(), 
TimeUnit.MILLISECONDS);
             }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
index 9ba5f713d8..55a97d2bbf 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
@@ -22,6 +22,7 @@ import org.apache.nifi.authorization.Resource;
 import org.apache.nifi.authorization.resource.Authorizable;
 import org.apache.nifi.authorization.resource.ResourceFactory;
 import org.apache.nifi.authorization.resource.ResourceType;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.validation.ValidationTrigger;
 import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.LoggableComponent;
@@ -36,6 +37,9 @@ import org.apache.nifi.registry.ComponentVariableRegistry;
 import org.apache.nifi.reporting.ReportingContext;
 import org.apache.nifi.reporting.ReportingTask;
 
+import java.util.Collections;
+import java.util.List;
+
 public class StandardReportingTaskNode extends AbstractReportingTaskNode 
implements ReportingTaskNode {
 
     private final FlowController flowController;
@@ -87,6 +91,11 @@ public class StandardReportingTaskNode extends 
AbstractReportingTaskNode impleme
         return new StandardReportingContext(flowController, 
flowController.getBulletinRepository(), getEffectivePropertyValues(), this, 
getVariableRegistry(), ParameterLookup.EMPTY);
     }
 
+    @Override
+    protected List<ValidationResult> validateConfig() {
+        return Collections.emptyList();
+    }
+
     @Override
     protected ParameterContext getParameterContext() {
         return null;
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
index 71c590d6b4..3b15ee45fb 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
@@ -105,7 +105,8 @@ public class QuartzSchedulingAgent extends 
AbstractTimeBasedSchedulingAgent {
             throw new IllegalStateException("Cannot schedule " + connectable + 
" because it is already scheduled to run");
         }
 
-        final String cronSchedule = connectable.getSchedulingPeriod();
+        final String cronSchedule = 
connectable.evaluateParameters(connectable.getSchedulingPeriod());
+
         final CronExpression cronExpression;
         try {
             cronExpression = new CronExpression(cronSchedule);
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
index b2f38fe620..c1377a7282 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
@@ -240,7 +240,7 @@ public class ProcessorLifecycleIT {
         this.randomOnTriggerDelay(testProcessor, randomDelayLimit);
 
         testProcNode.setMaxConcurrentTasks(4);
-        testProcNode.setScheduldingPeriod("500 millis");
+        testProcNode.setSchedulingPeriod("500 millis");
         testProcNode.setAutoTerminatedRelationships(Collections.singleton(new 
Relationship.Builder().name("success").build()));
 
         testGroup.addProcessor(testProcNode);
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
index 46de9db952..965b3ac3e9 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
@@ -582,7 +582,7 @@ public class FrameworkIntegrationTest {
         final FlowFileEvent initialReport = getStatusReport(processor);
         final int initialInvocations = (initialReport == null) ? 0 : 
initialReport.getInvocations();
 
-        processor.setScheduldingPeriod("1 hour");
+        processor.setSchedulingPeriod("1 hour");
 
         // We will only trigger the Processor to run once per hour. So we need 
to ensure that
         // we don't trigger the Processor while it's yielded. So if its yield 
expiration is in the future,
@@ -600,7 +600,7 @@ public class FrameworkIntegrationTest {
         }
 
         stop(processor).get();
-        processor.setScheduldingPeriod(schedulingPeriod);
+        processor.setSchedulingPeriod(schedulingPeriod);
     }
 
     protected FlowFileEvent getStatusReport(final ProcessorNode processor) {
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processgroup/StandardProcessGroupIT.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processgroup/StandardProcessGroupIT.java
index bb00309ec0..a2bc54fc44 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processgroup/StandardProcessGroupIT.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processgroup/StandardProcessGroupIT.java
@@ -200,7 +200,7 @@ public class StandardProcessGroupIT extends 
FrameworkIntegrationTest {
         final Set<ComponentNode> rootAffected = 
getRootGroup().getComponentsAffectedByVariable("number");
         assertTrue(rootAffected.isEmpty());
 
-        processor.setScheduldingPeriod("1 hour");
+        processor.setSchedulingPeriod("1 hour");
         child.startProcessor(processor, false);
 
         getRootGroup().setVariables(Collections.singletonMap("number", "2"));
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardPublicPort.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardPublicPort.java
index ec1b5b3ea5..5c8318a82c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardPublicPort.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardPublicPort.java
@@ -109,7 +109,7 @@ public class StandardPublicPort extends AbstractPort 
implements PublicPort {
 
         super(id, name, type, scheduler);
 
-        setScheduldingPeriod(MINIMUM_SCHEDULING_NANOS + " nanos");
+        setSchedulingPeriod(MINIMUM_SCHEDULING_NANOS + " nanos");
         this.authorizer = authorizer;
         this.secure = secure;
         this.identityMappings = identityMappings;
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index e30df55e93..b6e6b0460d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -103,7 +103,7 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
         this.remoteGroup = remoteGroup;
         this.transferDirection = direction;
         this.sslContext = sslContext;
-        setScheduldingPeriod(MINIMUM_SCHEDULING_NANOS + " nanos");
+        setSchedulingPeriod(MINIMUM_SCHEDULING_NANOS + " nanos");
     }
 
     @Override
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
index fe8a02c2be..6972e54921 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
@@ -181,7 +181,7 @@ public class StandardProcessorDAO extends ComponentDAO 
implements ProcessorDAO {
                     processor.setMaxConcurrentTasks(maxTasks);
                 }
                 if (isNotNull(schedulingPeriod)) {
-                    processor.setScheduldingPeriod(schedulingPeriod);
+                    processor.setSchedulingPeriod(schedulingPeriod);
                 }
                 if (isNotNull(penaltyDuration)) {
                     processor.setPenalizationPeriod(penaltyDuration);
@@ -308,22 +308,25 @@ public class StandardProcessorDAO extends ComponentDAO 
implements ProcessorDAO {
         }
 
         // validate the scheduling period based on the scheduling strategy
-        if (isNotNull(config.getSchedulingPeriod())) {
+        final String schedulingPeriod = config.getSchedulingPeriod();
+        final String evaluatedSchedulingPeriod = 
processorNode.evaluateParameters(schedulingPeriod);
+
+        if (isNotNull(schedulingPeriod) && 
isNotNull(evaluatedSchedulingPeriod)) {
             switch (schedulingStrategy) {
                 case TIMER_DRIVEN:
                 case PRIMARY_NODE_ONLY:
-                    final Matcher schedulingMatcher = 
FormatUtils.TIME_DURATION_PATTERN.matcher(config.getSchedulingPeriod());
+                    final Matcher schedulingMatcher = 
FormatUtils.TIME_DURATION_PATTERN.matcher(evaluatedSchedulingPeriod);
                     if (!schedulingMatcher.matches()) {
                         validationErrors.add("Scheduling period is not a valid 
time duration (ie 30 sec, 5 min)");
                     }
                     break;
                 case CRON_DRIVEN:
                     try {
-                        new CronExpression(config.getSchedulingPeriod());
+                        new CronExpression(evaluatedSchedulingPeriod);
                     } catch (final ParseException pe) {
-                        throw new 
IllegalArgumentException(String.format("Scheduling Period '%s' is not a valid 
cron expression: %s", config.getSchedulingPeriod(), pe.getMessage()));
+                        throw new 
IllegalArgumentException(String.format("Scheduling Period '%s' is not a valid 
cron expression: %s", schedulingPeriod, pe.getMessage()));
                     } catch (final Exception e) {
-                        throw new IllegalArgumentException("Scheduling Period 
is not a valid cron expression: " + config.getSchedulingPeriod());
+                        throw new IllegalArgumentException("Scheduling Period 
is not a valid cron expression: " + schedulingPeriod);
                     }
                     break;
             }
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingTaskNode.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingTaskNode.java
index 5e218cdf31..6acfb48d21 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingTaskNode.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingTaskNode.java
@@ -19,6 +19,7 @@ package org.apache.nifi.controller.reporting;
 
 import org.apache.nifi.authorization.Resource;
 import org.apache.nifi.authorization.resource.Authorizable;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.validation.ValidationTrigger;
 import org.apache.nifi.controller.LoggableComponent;
 import org.apache.nifi.controller.ProcessScheduler;
@@ -33,6 +34,9 @@ import org.apache.nifi.reporting.ReportingContext;
 import org.apache.nifi.reporting.ReportingTask;
 import org.apache.nifi.stateless.engine.StatelessEngine;
 
+import java.util.Collections;
+import java.util.List;
+
 public class StatelessReportingTaskNode extends AbstractReportingTaskNode 
implements ReportingTaskNode {
     private final FlowManager flowManager;
     private final StatelessEngine statelessEngine;
@@ -46,6 +50,11 @@ public class StatelessReportingTaskNode extends 
AbstractReportingTaskNode implem
         this.statelessEngine = statelessEngine;
     }
 
+    @Override
+    protected List<ValidationResult> validateConfig() {
+        return Collections.emptyList();
+    }
+
     @Override
     protected ParameterContext getParameterContext() {
         return null;

Reply via email to