This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new d5b626f0e4 NIFI-10108 Processor scheduling via parameter (#6115)
d5b626f0e4 is described below
commit d5b626f0e40bba7cb8fb0af14a4e344f24954c68
Author: timeabarna <[email protected]>
AuthorDate: Thu Jun 30 16:11:13 2022 +0200
NIFI-10108 Processor scheduling via parameter (#6115)
- NIFI-10108 Processor scheduling via parameter
- Refactoring component referencing check in AbstractComponentNode and
StandardParameterReferenceManager classes.
---
.../org/apache/nifi/controller/Triggerable.java | 2 +-
.../nifi/controller/StandardProcessorNode.java | 149 +++++++++++++++++----
.../service/StandardControllerServiceNode.java | 6 +
.../StandardVersionedComponentSynchronizer.java | 2 +-
.../StandardParameterReferenceManager.java | 18 +--
.../org/apache/nifi/connectable/Connectable.java | 2 +
.../nifi/controller/AbstractComponentNode.java | 35 ++++-
.../org/apache/nifi/controller/AbstractPort.java | 9 +-
.../org/apache/nifi/controller/ComponentNode.java | 11 ++
.../org/apache/nifi/controller/StandardFunnel.java | 7 +-
.../nifi/controller/TestAbstractComponentNode.java | 5 +
.../nifi/controller/StandardFlowSnippet.java | 2 +-
.../nifi/controller/XmlFlowSynchronizer.java | 2 +-
.../reporting/StandardReportingTaskNode.java | 9 ++
.../scheduling/QuartzSchedulingAgent.java | 3 +-
.../scheduling/ProcessorLifecycleIT.java | 2 +-
.../nifi/integration/FrameworkIntegrationTest.java | 4 +-
.../processgroup/StandardProcessGroupIT.java | 2 +-
.../org/apache/nifi/remote/StandardPublicPort.java | 2 +-
.../nifi/remote/StandardRemoteGroupPort.java | 2 +-
.../nifi/web/dao/impl/StandardProcessorDAO.java | 15 ++-
.../reporting/StatelessReportingTaskNode.java | 9 ++
22 files changed, 234 insertions(+), 64 deletions(-)
diff --git
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/Triggerable.java
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/Triggerable.java
index 5255c0504e..40ba6dc67b 100644
---
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/Triggerable.java
+++
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/Triggerable.java
@@ -113,5 +113,5 @@ public interface Triggerable {
*
* @param schedulingPeriod to set
*/
- void setScheduldingPeriod(String schedulingPeriod);
+ void setSchedulingPeriod(String schedulingPeriod);
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index de860a7b95..eb8386a830 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -61,8 +61,14 @@ import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.InstanceClassLoader;
import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.parameter.ExpressionLanguageAgnosticParameterParser;
+import org.apache.nifi.parameter.Parameter;
import org.apache.nifi.parameter.ParameterContext;
+import org.apache.nifi.parameter.ParameterDescriptor;
import org.apache.nifi.parameter.ParameterLookup;
+import org.apache.nifi.parameter.ParameterParser;
+import org.apache.nifi.parameter.ParameterReference;
+import org.apache.nifi.parameter.ParameterTokenList;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Processor;
@@ -127,6 +133,8 @@ public class StandardProcessorNode extends ProcessorNode
implements Connectable
public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
public static final String DEFAULT_YIELD_PERIOD = "1 sec";
public static final String DEFAULT_PENALIZATION_PERIOD = "30 sec";
+ private static final String RUN_SCHEDULE = "Run Schedule";
+
private final AtomicReference<ProcessGroup> processGroup;
private final AtomicReference<ProcessorDetails> processorRef;
private final AtomicReference<String> identifier;
@@ -150,6 +158,7 @@ public class StandardProcessorNode extends ProcessorNode
implements Connectable
private volatile long yieldNanos;
private volatile ScheduledState desiredState = ScheduledState.STOPPED;
private volatile LogLevel bulletinLevel = LogLevel.WARN;
+ private volatile List<ParameterReference> parameterReferences =
Collections.emptyList();
private SchedulingStrategy schedulingStrategy; // guarded by synchronized
keyword
private ExecutionNode executionNode;
@@ -220,7 +229,7 @@ public class StandardProcessorNode extends ProcessorNode
implements Connectable
LOG.error(String.format("Error while setting scheduling
strategy from DefaultSchedule annotation: %s", ex.getMessage()), ex);
}
try {
- this.setScheduldingPeriod(dsc.period());
+ this.setSchedulingPeriod(dsc.period());
} catch (Throwable ex) {
this.setSchedulingStrategy(SchedulingStrategy.TIMER_DRIVEN);
LOG.error(String.format("Error while setting scheduling
period from DefaultSchedule annotation: %s", ex.getMessage()), ex);
@@ -504,36 +513,20 @@ public class StandardProcessorNode extends ProcessorNode
implements Connectable
}
@Override
- @SuppressWarnings("deprecation")
- public synchronized void setScheduldingPeriod(final String
schedulingPeriod) {
+ public synchronized void setSchedulingPeriod(final String
schedulingPeriod) {
if (isRunning()) {
throw new IllegalStateException("Cannot modify Processor
configuration while the Processor is running");
}
- switch (schedulingStrategy) {
- case CRON_DRIVEN: {
- try {
- new CronExpression(schedulingPeriod);
- } catch (final Exception e) {
- throw new IllegalArgumentException(
- "Scheduling Period is not a valid cron expression: " +
schedulingPeriod);
- }
- }
- break;
- case PRIMARY_NODE_ONLY:
- case TIMER_DRIVEN: {
- final long schedulingNanos =
FormatUtils.getTimeDuration(requireNonNull(schedulingPeriod),
- TimeUnit.NANOSECONDS);
- if (schedulingNanos < 0) {
- throw new IllegalArgumentException("Scheduling Period must be
positive");
- }
- this.schedulingNanos.set(Math.max(MINIMUM_SCHEDULING_NANOS,
schedulingNanos));
- }
- break;
- case EVENT_DRIVEN:
- default:
- return;
- }
+ //Before setting the new Configuration references, we need to remove
the current ones from reference counts.
+ parameterReferences.forEach(parameterReference ->
decrementReferenceCounts(parameterReference.getParameterName()));
+
+ //Setting the new Configuration references.
+ final ParameterParser parameterParser = new
ExpressionLanguageAgnosticParameterParser();
+ final ParameterTokenList parameterTokenList =
parameterParser.parseTokens(schedulingPeriod);
+
+ parameterReferences = new
ArrayList<>(parameterTokenList.toReferenceList());
+ parameterReferences.forEach(parameterReference ->
incrementReferenceCounts(parameterReference.getParameterName()));
this.schedulingPeriod.set(schedulingPeriod);
}
@@ -1197,6 +1190,98 @@ public class StandardProcessorNode extends ProcessorNode
implements Connectable
return results;
}
+ @Override
+ @SuppressWarnings("deprecation")
+ public List<ValidationResult> validateConfig() {
+
+ final List<ValidationResult> results = new ArrayList<>();
+ final ParameterContext parameterContext = getParameterContext();
+
+ if (parameterContext == null && !this.parameterReferences.isEmpty()) {
+ results.add(new ValidationResult.Builder()
+ .subject(RUN_SCHEDULE)
+ .input("Parameter Context")
+ .valid(false)
+ .explanation("Processor configuration references one or
more Parameters but no Parameter Context is currently set on the Process
Group.")
+ .build());
+ } else {
+ for (final ParameterReference paramRef : parameterReferences) {
+ final Optional<Parameter> parameterRef =
parameterContext.getParameter(paramRef.getParameterName());
+ if (!parameterRef.isPresent() ) {
+ results.add(new ValidationResult.Builder()
+ .subject(RUN_SCHEDULE)
+ .input(paramRef.getParameterName())
+ .valid(false)
+ .explanation("Processor configuration references
Parameter '" + paramRef.getParameterName() +
+ "' but the currently selected Parameter
Context does not have a Parameter with that name")
+ .build());
+ } else {
+ final ParameterDescriptor parameterDescriptor =
parameterRef.get().getDescriptor();
+ if (parameterDescriptor.isSensitive()) {
+ results.add(new ValidationResult.Builder()
+ .subject(RUN_SCHEDULE)
+ .input(parameterDescriptor.getName())
+ .valid(false)
+ .explanation("Processor configuration cannot
reference sensitive parameters")
+ .build());
+ }
+ }
+ }
+
+ final String schedulingPeriod = getSchedulingPeriod();
+ final String evaluatedSchedulingPeriod =
evaluateParameters(schedulingPeriod);
+
+ if (evaluatedSchedulingPeriod != null) {
+ switch (schedulingStrategy) {
+ case CRON_DRIVEN: {
+ try {
+ new CronExpression(evaluatedSchedulingPeriod);
+ } catch (final Exception e) {
+ results.add(new ValidationResult.Builder()
+ .subject(RUN_SCHEDULE)
+ .input(schedulingPeriod)
+ .valid(false)
+ .explanation("Scheduling Period is not a
valid cron expression")
+ .build());
+ }
+ }
+ break;
+ case PRIMARY_NODE_ONLY:
+ case TIMER_DRIVEN: {
+ try {
+ final long schedulingNanos =
FormatUtils.getTimeDuration(requireNonNull(evaluatedSchedulingPeriod),
+ TimeUnit.NANOSECONDS);
+
+ if (schedulingNanos < 0) {
+ results.add(new ValidationResult.Builder()
+ .subject(RUN_SCHEDULE)
+ .input(schedulingPeriod)
+ .valid(false)
+ .explanation("Scheduling Period must
be positive")
+ .build());
+ }
+
+
this.schedulingNanos.set(Math.max(MINIMUM_SCHEDULING_NANOS, schedulingNanos));
+
+ } catch (final Exception e) {
+ results.add(new ValidationResult.Builder()
+ .subject(RUN_SCHEDULE)
+ .input(schedulingPeriod)
+ .valid(false)
+ .explanation("Scheduling Period is not a
valid time duration")
+ .build());
+ }
+ }
+ break;
+ case EVENT_DRIVEN:
+ default:
+ return results;
+ }
+ }
+ }
+ return results;
+ }
+
@Override
public Requirement getInputRequirement() {
return processorRef.get().getInputRequirement();
@@ -1928,6 +2013,16 @@ public class StandardProcessorNode extends ProcessorNode
implements Connectable
}
this.maxBackoffPeriod = maxBackoffPeriod;
}
+
+ @Override
+ public String evaluateParameters(final String value) {
+ final ParameterContext parameterContext = getParameterContext();
+ final ParameterParser parameterParser = new
ExpressionLanguageAgnosticParameterParser();
+ final ParameterTokenList parameterTokenList =
parameterParser.parseTokens(value);
+
+ return parameterTokenList.substitute(parameterContext);
+ }
+
private void monitorAsyncTask(final Future<?> taskFuture, final Future<?>
monitoringFuture, final long completionTimestamp) {
if (taskFuture.isDone()) {
monitoringFuture.cancel(false); // stop scheduling this task
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index d8673a2650..0c250499c7 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -32,6 +32,7 @@ import
org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.validation.ValidationState;
import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.components.validation.ValidationTrigger;
@@ -509,6 +510,11 @@ public class StandardControllerServiceNode extends
AbstractComponentNode impleme
return state;
}
+ @Override
+ protected List<ValidationResult> validateConfig() {
+ return Collections.emptyList();
+ }
+
/**
* Will atomically enable this service by invoking its @OnEnabled
operation.
* It uses CAS operation on {@link #stateTransition} to transition this
service
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
index bc60d45ab3..fa1a07931f 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
@@ -2537,7 +2537,7 @@ public class StandardVersionedComponentSynchronizer
implements VersionedComponen
processor.setProperties(properties, true,
sensitiveDynamicPropertyNames);
processor.setRunDuration(proposed.getRunDurationMillis(),
TimeUnit.MILLISECONDS);
processor.setSchedulingStrategy(SchedulingStrategy.valueOf(proposed.getSchedulingStrategy()));
- processor.setScheduldingPeriod(proposed.getSchedulingPeriod());
+ processor.setSchedulingPeriod(proposed.getSchedulingPeriod());
processor.setMaxConcurrentTasks(proposed.getConcurrentlySchedulableTaskCount());
processor.setExecutionNode(ExecutionNode.valueOf(proposed.getExecutionNode()));
processor.setStyle(proposed.getStyle());
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/parameter/StandardParameterReferenceManager.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/parameter/StandardParameterReferenceManager.java
index 5238285bc1..a05d5c57ab 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/parameter/StandardParameterReferenceManager.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/parameter/StandardParameterReferenceManager.java
@@ -105,7 +105,7 @@ public class StandardParameterReferenceManager implements
ParameterReferenceMana
for (final ProcessGroup group : referencingGroups) {
for (final T componentNode : componentFunction.apply(group)) {
- if (isComponentReferencing(componentNode, parameterName)) {
+ if (componentNode.isReferencingParameter(parameterName)) {
referencingComponents.add(componentNode);
continue;
}
@@ -115,22 +115,6 @@ public class StandardParameterReferenceManager implements
ParameterReferenceMana
return referencingComponents;
}
- private boolean isComponentReferencing(final ComponentNode componentNode,
final String parameterName) {
- for (final PropertyConfiguration configuration :
componentNode.getProperties().values()) {
- if (configuration == null) {
- continue;
- }
-
- for (final ParameterReference reference :
configuration.getParameterReferences()) {
- if (parameterName.equals(reference.getParameterName())) {
- return true;
- }
- }
- }
-
- return false;
- }
-
private Set<ParameterReferencedControllerServiceData>
getReferencedControllerServiceData(
final ComponentNode componentNode,
final String parameterName,
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java
index 6e144ca7dc..cc006217c7 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java
@@ -310,4 +310,6 @@ public interface Connectable extends Triggerable,
ComponentAuthorizable, Positio
String getMaxBackoffPeriod();
void setMaxBackoffPeriod(String maxBackoffPeriod);
+
+ String evaluateParameters(String value);
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
index 32361f6576..c22df1610e 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
@@ -754,6 +754,8 @@ public abstract class AbstractComponentNode implements
ComponentNode {
}
final List<ValidationResult> invalidParameterResults =
validateParameterReferences(validationContext);
+ invalidParameterResults.addAll(validateConfig());
+
if (!invalidParameterResults.isEmpty()) {
// At this point, we are not able to properly resolve all
property values, so we will not attempt to perform
// any further validation. Doing so would result in values
being reported as invalid and containing confusing explanations.
@@ -793,6 +795,16 @@ public abstract class AbstractComponentNode implements
ComponentNode {
.build());
}
+ /**
+ * Validates the current configuration, returning ValidationResults for any
+ * invalid configuration parameter.
+ *
+ * @return Collection of validation result objects for any invalid findings
+ * only. If the collection is empty then the component is valid.
Should guarantee
+ * non-null
+ */
+ protected abstract List<ValidationResult> validateConfig();
+
private List<ValidationResult> validateParameterReferences(final
ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
@@ -1040,6 +1052,15 @@ public abstract class AbstractComponentNode implements
ComponentNode {
// For any Property that references an updated Parameter, we need to
call onPropertyModified().
// Additionally, we need to trigger validation to run if this
component is affected by the parameter update.
boolean componentAffected = false;
+
+ //Determine if the component is affected by the Parameter Update
+ for (final String updatedParameterName : updatedParameters.keySet()) {
+ if (isReferencingParameter(updatedParameterName)) {
+ componentAffected = true;
+ break;
+ }
+ }
+
for (final Map.Entry<PropertyDescriptor, PropertyConfiguration> entry
: properties.entrySet()) {
final PropertyDescriptor propertyDescriptor = entry.getKey();
final PropertyConfiguration configuration = entry.getValue();
@@ -1051,7 +1072,6 @@ public abstract class AbstractComponentNode implements
ComponentNode {
final String referencedParamName =
reference.getParameterName();
if (updatedParameters.containsKey(referencedParamName)) {
propertyAffected = true;
- componentAffected = true;
break;
}
}
@@ -1086,6 +1106,14 @@ public abstract class AbstractComponentNode implements
ComponentNode {
}
}
+ protected void incrementReferenceCounts(final String parameterName) {
+ parameterReferenceCounts.merge(parameterName, 1, (a, b) -> a == -1 ?
null : a + b);
+ }
+
+ protected void decrementReferenceCounts(final String parameterName) {
+ parameterReferenceCounts.merge(parameterName, -1, (a, b) -> a == 1 ?
null : a + b);
+ }
+
private ParameterLookup createParameterLookupForPreviousValues(final
Map<String, ParameterUpdate> updatedParameters) {
final ParameterContext currentContext = getParameterContext();
return new ParameterLookup() {
@@ -1350,4 +1378,9 @@ public abstract class AbstractComponentNode implements
ComponentNode {
}
protected abstract ParameterContext getParameterContext();
+
+ @Override
+ public boolean isReferencingParameter(final String parameterName) {
+ return parameterReferenceCounts.containsKey(parameterName);
+ }
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
index dad2b67eb6..d42bc729f1 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
@@ -465,7 +465,7 @@ public abstract class AbstractPort implements Port {
}
@Override
- public void setScheduldingPeriod(final String schedulingPeriod) {
+ public void setSchedulingPeriod(final String schedulingPeriod) {
final long schedulingNanos =
FormatUtils.getTimeDuration(requireNonNull(schedulingPeriod),
TimeUnit.NANOSECONDS);
if (schedulingNanos < 0) {
throw new IllegalArgumentException("Scheduling Period must be
positive");
@@ -701,4 +701,11 @@ public abstract class AbstractPort implements Port {
@Override
public void setMaxBackoffPeriod(String maxBackoffPeriod) {
}
+
+ @Override
+ public String evaluateParameters(String value) {
+ return value;
+ }
+
+
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
index 3be5204bb5..31bb6a6e81 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
@@ -73,8 +73,19 @@ public interface ComponentNode extends ComponentAuthorizable
{
*/
Set<String> getReferencedParameterNames();
+ /**
+ * Determines whether the component is referencing any parameter
+ * @return true if there is any parameter reference
+ **/
boolean isReferencingParameter();
+ /**
+ * Determines whether the component is referencing the given parameter
+ * @param parameterName Parameter Name
+ * @return true if parameter is referenced
+ **/
+ boolean isReferencingParameter(final String parameterName);
+
/**
* Notifies the Component that the value of a parameter has changed
* @param parameterUpdates a Map of Parameter name to a ParameterUpdate
that describes how the Parameter changed
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
index 026ad8eb45..a636e05767 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
@@ -442,7 +442,7 @@ public class StandardFunnel implements Funnel {
}
@Override
- public void setScheduldingPeriod(final String schedulingPeriod) {
+ public void setSchedulingPeriod(final String schedulingPeriod) {
final long schedulingNanos =
FormatUtils.getTimeDuration(requireNonNull(schedulingPeriod),
TimeUnit.NANOSECONDS);
if (schedulingNanos < 0) {
throw new IllegalArgumentException("Scheduling Period must be
positive");
@@ -619,6 +619,11 @@ public class StandardFunnel implements Funnel {
public void setMaxBackoffPeriod(String maxBackoffPeriod) {
}
+ @Override
+ public String evaluateParameters(String value) {
+ return value;
+ }
+
@Override
public Optional<String> getVersionedComponentId() {
return Optional.ofNullable(versionedComponentId.get());
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestAbstractComponentNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestAbstractComponentNode.java
index c5ccc80860..f562516423 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestAbstractComponentNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestAbstractComponentNode.java
@@ -403,6 +403,11 @@ public class TestAbstractComponentNode {
return null;
}
+ @Override
+ protected List<ValidationResult> validateConfig() {
+ return Collections.emptyList();
+ }
+
@Override
public void verifyModifiable() throws IllegalStateException {
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java
index 1490459ac7..e7eb0fc5c4 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java
@@ -411,7 +411,7 @@ public class StandardFlowSnippet implements FlowSnippet {
// ensure that the scheduling strategy is set prior to these
values
procNode.setMaxConcurrentTasks(config.getConcurrentlySchedulableTaskCount());
- procNode.setScheduldingPeriod(config.getSchedulingPeriod());
+ procNode.setSchedulingPeriod(config.getSchedulingPeriod());
final Set<Relationship> relationships = new HashSet<>();
if (processorDTO.getRelationships() != null) {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/XmlFlowSynchronizer.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/XmlFlowSynchronizer.java
index 86b5323baa..7d9ef97c04 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/XmlFlowSynchronizer.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/XmlFlowSynchronizer.java
@@ -1256,7 +1256,7 @@ public class XmlFlowSynchronizer implements
FlowSynchronizer {
// must set scheduling strategy before these two
procNode.setMaxConcurrentTasks(config.getConcurrentlySchedulableTaskCount());
- procNode.setScheduldingPeriod(config.getSchedulingPeriod());
+ procNode.setSchedulingPeriod(config.getSchedulingPeriod());
if (config.getRunDurationMillis() != null) {
procNode.setRunDuration(config.getRunDurationMillis(),
TimeUnit.MILLISECONDS);
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
index 9ba5f713d8..55a97d2bbf 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
@@ -22,6 +22,7 @@ import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.resource.ResourceType;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.validation.ValidationTrigger;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.LoggableComponent;
@@ -36,6 +37,9 @@ import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.ReportingTask;
+import java.util.Collections;
+import java.util.List;
+
public class StandardReportingTaskNode extends AbstractReportingTaskNode
implements ReportingTaskNode {
private final FlowController flowController;
@@ -87,6 +91,11 @@ public class StandardReportingTaskNode extends
AbstractReportingTaskNode impleme
return new StandardReportingContext(flowController,
flowController.getBulletinRepository(), getEffectivePropertyValues(), this,
getVariableRegistry(), ParameterLookup.EMPTY);
}
+ @Override
+ protected List<ValidationResult> validateConfig() {
+ return Collections.emptyList();
+ }
+
@Override
protected ParameterContext getParameterContext() {
return null;
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
index 71c590d6b4..3b15ee45fb 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
@@ -105,7 +105,8 @@ public class QuartzSchedulingAgent extends
AbstractTimeBasedSchedulingAgent {
throw new IllegalStateException("Cannot schedule " + connectable +
" because it is already scheduled to run");
}
- final String cronSchedule = connectable.getSchedulingPeriod();
+ final String cronSchedule =
connectable.evaluateParameters(connectable.getSchedulingPeriod());
+
final CronExpression cronExpression;
try {
cronExpression = new CronExpression(cronSchedule);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
index b2f38fe620..c1377a7282 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
@@ -240,7 +240,7 @@ public class ProcessorLifecycleIT {
this.randomOnTriggerDelay(testProcessor, randomDelayLimit);
testProcNode.setMaxConcurrentTasks(4);
- testProcNode.setScheduldingPeriod("500 millis");
+ testProcNode.setSchedulingPeriod("500 millis");
testProcNode.setAutoTerminatedRelationships(Collections.singleton(new
Relationship.Builder().name("success").build()));
testGroup.addProcessor(testProcNode);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
index 46de9db952..965b3ac3e9 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
@@ -582,7 +582,7 @@ public class FrameworkIntegrationTest {
final FlowFileEvent initialReport = getStatusReport(processor);
final int initialInvocations = (initialReport == null) ? 0 :
initialReport.getInvocations();
- processor.setScheduldingPeriod("1 hour");
+ processor.setSchedulingPeriod("1 hour");
// We will only trigger the Processor to run once per hour. So we need
to ensure that
// we don't trigger the Processor while it's yielded. So if its yield
expiration is in the future,
@@ -600,7 +600,7 @@ public class FrameworkIntegrationTest {
}
stop(processor).get();
- processor.setScheduldingPeriod(schedulingPeriod);
+ processor.setSchedulingPeriod(schedulingPeriod);
}
protected FlowFileEvent getStatusReport(final ProcessorNode processor) {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processgroup/StandardProcessGroupIT.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processgroup/StandardProcessGroupIT.java
index bb00309ec0..a2bc54fc44 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processgroup/StandardProcessGroupIT.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processgroup/StandardProcessGroupIT.java
@@ -200,7 +200,7 @@ public class StandardProcessGroupIT extends
FrameworkIntegrationTest {
final Set<ComponentNode> rootAffected =
getRootGroup().getComponentsAffectedByVariable("number");
assertTrue(rootAffected.isEmpty());
- processor.setScheduldingPeriod("1 hour");
+ processor.setSchedulingPeriod("1 hour");
child.startProcessor(processor, false);
getRootGroup().setVariables(Collections.singletonMap("number", "2"));
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardPublicPort.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardPublicPort.java
index ec1b5b3ea5..5c8318a82c 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardPublicPort.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardPublicPort.java
@@ -109,7 +109,7 @@ public class StandardPublicPort extends AbstractPort
implements PublicPort {
super(id, name, type, scheduler);
- setScheduldingPeriod(MINIMUM_SCHEDULING_NANOS + " nanos");
+ setSchedulingPeriod(MINIMUM_SCHEDULING_NANOS + " nanos");
this.authorizer = authorizer;
this.secure = secure;
this.identityMappings = identityMappings;
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index e30df55e93..b6e6b0460d 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -103,7 +103,7 @@ public class StandardRemoteGroupPort extends
RemoteGroupPort {
this.remoteGroup = remoteGroup;
this.transferDirection = direction;
this.sslContext = sslContext;
- setScheduldingPeriod(MINIMUM_SCHEDULING_NANOS + " nanos");
+ setSchedulingPeriod(MINIMUM_SCHEDULING_NANOS + " nanos");
}
@Override
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
index fe8a02c2be..6972e54921 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
@@ -181,7 +181,7 @@ public class StandardProcessorDAO extends ComponentDAO
implements ProcessorDAO {
processor.setMaxConcurrentTasks(maxTasks);
}
if (isNotNull(schedulingPeriod)) {
- processor.setScheduldingPeriod(schedulingPeriod);
+ processor.setSchedulingPeriod(schedulingPeriod);
}
if (isNotNull(penaltyDuration)) {
processor.setPenalizationPeriod(penaltyDuration);
@@ -308,22 +308,25 @@ public class StandardProcessorDAO extends ComponentDAO
implements ProcessorDAO {
}
// validate the scheduling period based on the scheduling strategy
- if (isNotNull(config.getSchedulingPeriod())) {
+ final String schedulingPeriod = config.getSchedulingPeriod();
+ final String evaluatedSchedulingPeriod =
processorNode.evaluateParameters(schedulingPeriod);
+
+ if (isNotNull(schedulingPeriod) &&
isNotNull(evaluatedSchedulingPeriod)) {
switch (schedulingStrategy) {
case TIMER_DRIVEN:
case PRIMARY_NODE_ONLY:
- final Matcher schedulingMatcher =
FormatUtils.TIME_DURATION_PATTERN.matcher(config.getSchedulingPeriod());
+ final Matcher schedulingMatcher =
FormatUtils.TIME_DURATION_PATTERN.matcher(evaluatedSchedulingPeriod);
if (!schedulingMatcher.matches()) {
validationErrors.add("Scheduling period is not a valid
time duration (ie 30 sec, 5 min)");
}
break;
case CRON_DRIVEN:
try {
- new CronExpression(config.getSchedulingPeriod());
+ new CronExpression(evaluatedSchedulingPeriod);
} catch (final ParseException pe) {
- throw new
IllegalArgumentException(String.format("Scheduling Period '%s' is not a valid
cron expression: %s", config.getSchedulingPeriod(), pe.getMessage()));
+ throw new
IllegalArgumentException(String.format("Scheduling Period '%s' is not a valid
cron expression: %s", schedulingPeriod, pe.getMessage()));
} catch (final Exception e) {
- throw new IllegalArgumentException("Scheduling Period
is not a valid cron expression: " + config.getSchedulingPeriod());
+ throw new IllegalArgumentException("Scheduling Period
is not a valid cron expression: " + schedulingPeriod);
}
break;
}
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingTaskNode.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingTaskNode.java
index 5e218cdf31..6acfb48d21 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingTaskNode.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingTaskNode.java
@@ -19,6 +19,7 @@ package org.apache.nifi.controller.reporting;
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.validation.ValidationTrigger;
import org.apache.nifi.controller.LoggableComponent;
import org.apache.nifi.controller.ProcessScheduler;
@@ -33,6 +34,9 @@ import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.stateless.engine.StatelessEngine;
+import java.util.Collections;
+import java.util.List;
+
public class StatelessReportingTaskNode extends AbstractReportingTaskNode
implements ReportingTaskNode {
private final FlowManager flowManager;
private final StatelessEngine statelessEngine;
@@ -46,6 +50,11 @@ public class StatelessReportingTaskNode extends
AbstractReportingTaskNode implem
this.statelessEngine = statelessEngine;
}
+ @Override
+ protected List<ValidationResult> validateConfig() {
+ return Collections.emptyList();
+ }
+
@Override
protected ParameterContext getParameterContext() {
return null;