diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index 2e2c8f75cb..e76d432d0d 100755
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -59,6 +59,7 @@
 import java.sql.Timestamp;
 import java.time.Duration;
 import java.time.temporal.ChronoUnit;
+import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
@@ -167,7 +168,7 @@ private static Schema buildAvroSchema(final DataType 
dataType, final String fiel
 
                 for (final DataType option : options) {
                     final Schema optionSchema = buildAvroSchema(option, 
fieldName, false);
-                    if (!typesAdded.contains(optionSchema.getType())) {
+                    if (optionSchema != null && 
!typesAdded.contains(optionSchema.getType())) {
                         unionTypes.add(optionSchema);
                         typesAdded.add(optionSchema.getType());
                     }
@@ -406,7 +407,7 @@ public static RecordSchema createSchema(final Schema 
avroSchema, final String sc
         for (final Field field : avroSchema.getFields()) {
             final String fieldName = field.name();
             final Schema fieldSchema = field.schema();
-            final DataType dataType = 
AvroTypeUtil.determineDataType(fieldSchema, knownRecords);
+            final DataType dataType = determineDataType(fieldSchema, 
knownRecords);
             final boolean nullable = isNullable(fieldSchema);
             addFieldToList(recordFields, field, fieldName, fieldSchema, 
dataType, nullable);
         }
@@ -547,7 +548,7 @@ private static void addFieldToList(final List<RecordField> 
recordFields, final F
     }
 
     private static Long getLongFromTimestamp(final Object rawValue, final 
Schema fieldSchema, final String fieldName) {
-        final String format = 
AvroTypeUtil.determineDataType(fieldSchema).getFormat();
+        final String format = determineDataType(fieldSchema).getFormat();
         Timestamp t = DataTypeUtils.toTimestamp(rawValue, () -> 
DataTypeUtils.getDateFormat(format), fieldName);
         return t.getTime();
     }
@@ -566,13 +567,13 @@ private static Object convertToAvroObject(final Object 
rawValue, final Schema fi
                 }
 
                 if (LOGICAL_TYPE_DATE.equals(logicalType.getName())) {
-                    final String format = 
AvroTypeUtil.determineDataType(fieldSchema).getFormat();
+                    final String format = 
determineDataType(fieldSchema).getFormat();
                     final Date date = DataTypeUtils.toDate(rawValue, () -> 
DataTypeUtils.getDateFormat(format), fieldName);
                     final Duration duration = Duration.between(new 
Date(0L).toInstant(), new Date(date.getTime()).toInstant());
                     final long days = duration.toDays();
                     return (int) days;
                 } else if 
(LOGICAL_TYPE_TIME_MILLIS.equals(logicalType.getName())) {
-                    final String format = 
AvroTypeUtil.determineDataType(fieldSchema).getFormat();
+                    final String format = 
determineDataType(fieldSchema).getFormat();
                     final Time time = DataTypeUtils.toTime(rawValue, () -> 
DataTypeUtils.getDateFormat(format), fieldName);
                     final Date date = new Date(time.getTime());
                     final Duration duration = 
Duration.between(date.toInstant().truncatedTo(ChronoUnit.DAYS), 
date.toInstant());
@@ -594,7 +595,7 @@ private static Object convertToAvroObject(final Object 
rawValue, final Schema fi
                     final Duration duration = 
Duration.between(date.toInstant().truncatedTo(ChronoUnit.DAYS), 
date.toInstant());
                     return duration.toMillis() * 1000L;
                 } else if 
(LOGICAL_TYPE_TIMESTAMP_MILLIS.equals(logicalType.getName())) {
-                    final String format = 
AvroTypeUtil.determineDataType(fieldSchema).getFormat();
+                    final String format = 
determineDataType(fieldSchema).getFormat();
                     Timestamp t = DataTypeUtils.toTimestamp(rawValue, () -> 
DataTypeUtils.getDateFormat(format), fieldName);
                     return getLongFromTimestamp(rawValue, fieldSchema, 
fieldName);
                 } else if 
(LOGICAL_TYPE_TIMESTAMP_MICROS.equals(logicalType.getName())) {
@@ -642,7 +643,7 @@ private static Object convertToAvroObject(final Object 
rawValue, final Schema fi
                     return ByteBuffer.wrap(((String) 
rawValue).getBytes(charset));
                 }
                 if (rawValue instanceof Object[]) {
-                    return AvroTypeUtil.convertByteArray((Object[]) rawValue);
+                    return convertByteArray((Object[]) rawValue);
                 } else {
                     throw new IllegalTypeConversionException("Cannot convert 
value " + rawValue + " of type " + rawValue.getClass() + " to a ByteBuffer");
                 }
@@ -672,24 +673,38 @@ private static Object convertToAvroObject(final Object 
rawValue, final Schema fi
             case RECORD:
                 final GenericData.Record avroRecord = new 
GenericData.Record(fieldSchema);
 
-                final Record record = (Record) rawValue;
-                for (final RecordField recordField : 
record.getSchema().getFields()) {
-                    final Object recordFieldValue = 
record.getValue(recordField);
-                    final String recordFieldName = recordField.getFieldName();
-
-                    final Field field = fieldSchema.getField(recordFieldName);
+                final Set<Map.Entry<String, Object>> entries;
+                if (rawValue instanceof Map) {
+                    final Map<String, Object> map = (Map<String, Object>) 
rawValue;
+                    entries = map.entrySet();
+                } else if (rawValue instanceof Record) {
+                    entries = new HashSet<>();
+                    final Record record = (Record) rawValue;
+                    record.getSchema().getFields().forEach(field -> 
entries.add(new AbstractMap.SimpleEntry<>(field.getFieldName(), 
record.getValue(field))));
+                } else {
+                    throw new IllegalTypeConversionException("Cannot convert 
value " + rawValue + " of type " + rawValue.getClass() + " to a Record");
+                }
+                for (final Map.Entry<String, Object> e : entries) {
+                    final Field field = fieldSchema.getField(e.getKey());
                     if (field == null) {
                         continue;
                     }
 
-                    final Object converted = 
convertToAvroObject(recordFieldValue, field.schema(), fieldName + "/" + 
recordFieldName, charset);
-                    avroRecord.put(recordFieldName, converted);
+                    final Object converted = convertToAvroObject(e.getValue(), 
field.schema(), fieldName + "/" + e.getKey(), charset);
+                    avroRecord.put(e.getKey(), converted);
                 }
                 return avroRecord;
             case UNION:
                 return convertUnionFieldValue(rawValue, fieldSchema, schema -> 
convertToAvroObject(rawValue, schema, fieldName, charset), fieldName);
             case ARRAY:
-                final Object[] objectArray = (Object[]) rawValue;
+                final Object[] objectArray;
+                if (rawValue instanceof List) {
+                    objectArray = ((List) rawValue).toArray();
+                } else if (rawValue instanceof Object[]) {
+                    objectArray = (Object[]) rawValue;
+                } else {
+                    throw new IllegalTypeConversionException("Cannot convert 
value " + rawValue + " of type " + rawValue.getClass() + " to an Array");
+                }
                 final List<Object> list = new ArrayList<>(objectArray.length);
                 int i = 0;
                 for (final Object o : objectArray) {
@@ -773,7 +788,7 @@ private static Object convertUnionFieldValue(final Object 
originalValue, final S
             }
 
             foundNonNull = true;
-            final DataType desiredDataType = 
AvroTypeUtil.determineDataType(subSchema);
+            final DataType desiredDataType = determineDataType(subSchema);
             try {
                 final Object convertedValue = conversion.apply(subSchema);
 
@@ -891,7 +906,7 @@ private static Object normalizeValue(final Object value, 
final Schema avroSchema
                     final Object fieldValue = normalizeValue(avroFieldValue, 
field.schema(), fieldName + "/" + field.name());
                     values.put(field.name(), fieldValue);
                 }
-                final RecordSchema childSchema = 
AvroTypeUtil.createSchema(recordSchema, false);
+                final RecordSchema childSchema = createSchema(recordSchema, 
false);
                 return new MapRecord(childSchema, values);
             case BYTES:
                 final ByteBuffer bb = (ByteBuffer) value;
@@ -899,10 +914,10 @@ private static Object normalizeValue(final Object value, 
final Schema avroSchema
                 if (logicalType != null && 
LOGICAL_TYPE_DECIMAL.equals(logicalType.getName())) {
                     return new Conversions.DecimalConversion().fromBytes(bb, 
avroSchema, logicalType);
                 }
-                return AvroTypeUtil.convertByteArray(bb.array());
+                return convertByteArray(bb.array());
             case FIXED:
                 final GenericFixed fixed = (GenericFixed) value;
-                return AvroTypeUtil.convertByteArray(fixed.bytes());
+                return convertByteArray(fixed.bytes());
             case ENUM:
                 return value.toString();
             case NULL:
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
index 1f6c29bfb3..a68e5bcd52 100755
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
@@ -450,4 +450,21 @@ public void testBytesToStringConversion() {
         assertTrue(o instanceof String);
         assertEquals("Hello", o);
     }
+    @Test
+    public void testListToArrayConversion() {
+        final Charset charset = Charset.forName("UTF_32LE");
+        Object o = 
AvroTypeUtil.convertToAvroObject(Collections.singletonList("Hello"), 
Schema.createArray(Schema.create(Type.STRING)), charset);
+        assertTrue(o instanceof List);
+        assertEquals(1, ((List) o).size());
+        assertEquals("Hello", ((List) o).get(0));
+    }
+
+    @Test
+    public void testMapToRecordConversion() {
+        final Charset charset = Charset.forName("UTF_32LE");
+        Object o = 
AvroTypeUtil.convertToAvroObject(Collections.singletonMap("Hello", "World"),
+                Schema.createRecord(Collections.singletonList(new 
Field("Hello", Schema.create(Type.STRING), "", ""))), charset);
+        assertTrue(o instanceof Record);
+        assertEquals("World", ((Record) o).get("Hello"));
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
index f3ae41fc1e..e17682eeed 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
@@ -373,9 +373,8 @@ public String toString() {
     }
 
     @Override
-    public final void performValidation() {
-        boolean replaced = false;
-        do {
+    public final ValidationStatus performValidation() {
+        while (true) {
             final ValidationState validationState = getValidationState();
 
             final ValidationContext validationContext = getValidationContext();
@@ -391,8 +390,11 @@ public final void performValidation() {
 
             final ValidationStatus status = results.isEmpty() ? 
ValidationStatus.VALID : ValidationStatus.INVALID;
             final ValidationState updatedState = new ValidationState(status, 
results);
-            replaced = replaceValidationState(validationState, updatedState);
-        } while (!replaced);
+            final boolean replaced = replaceValidationState(validationState, 
updatedState);
+            if (replaced) {
+                return status;
+            }
+        }
     }
 
     protected Collection<ValidationResult> computeValidationErrors(final 
ValidationContext validationContext) {
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
index d0ed572515..2357d41cd0 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
@@ -179,7 +179,7 @@ public default void setProperties(Map<String, String> 
properties) {
     /**
      * Asynchronously begins the validation process
      */
-    public abstract void performValidation();
+    public abstract ValidationStatus performValidation();
 
     /**
      * Returns a {@link List} of all {@link PropertyDescriptor}s that this
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
index 6e8206eb71..12eeb88f23 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.controller;
 
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.components.validation.ValidationTrigger;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.controller.scheduling.LifecycleState;
@@ -144,7 +145,13 @@ public ProcessorNode(final String id,
     public ScheduledState getScheduledState() {
         ScheduledState sc = this.scheduledState.get();
         if (sc == ScheduledState.STARTING) {
-            return ScheduledState.RUNNING;
+            final ValidationStatus validationStatus = getValidationStatus();
+
+            if (validationStatus == ValidationStatus.INVALID) {
+                return ScheduledState.STOPPED;
+            } else {
+                return ScheduledState.RUNNING;
+            }
         } else if (sc == ScheduledState.STOPPING) {
             return ScheduledState.STOPPED;
         }
@@ -240,4 +247,12 @@ public abstract void start(ScheduledExecutorService 
scheduler, long administrati
      * will result in the WARN message if processor can not be enabled.
      */
     public abstract void disable();
+
+    /**
+     * Returns the Scheduled State that is desired for this Processor. This 
may vary from the current state if the Processor is not
+     * currently valid, is in the process of stopping but should then 
transition to Running, etc.
+     *
+     * @return the desired state for this Processor
+     */
+    public abstract ScheduledState getDesiredState();
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index dad01b8a24..8ab7e69d54 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -912,7 +912,6 @@ public void trigger(final ComponentNode component) {
 
                     try {
                         if (connectable instanceof ProcessorNode) {
-                            ((ProcessorNode) 
connectable).getValidationStatus(5, TimeUnit.SECONDS);
                             
connectable.getProcessGroup().startProcessor((ProcessorNode) connectable, true);
                         } else {
                             startConnectable(connectable);
@@ -1242,7 +1241,7 @@ public ScheduledState getScheduledState(final 
ProcessorNode procNode) {
                         return ScheduledState.RUNNING;
                     }
 
-                    return procNode.getScheduledState();
+                    return procNode.getDesiredState();
                 }
 
                 @Override
@@ -1699,7 +1698,6 @@ public void startReportingTask(final ReportingTaskNode 
reportingTaskNode) {
             throw new IllegalStateException("Cannot start reporting task " + 
reportingTaskNode.getIdentifier() + " because the controller is terminated");
         }
 
-        reportingTaskNode.performValidation(); // ensure that the reporting 
task has completed its validation before attempting to start it
         reportingTaskNode.verifyCanStart();
         reportingTaskNode.reloadAdditionalResourcesIfNecessary();
         processScheduler.schedule(reportingTaskNode);
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 4adfad49b9..8ca51731e2 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -38,6 +38,7 @@
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.validation.ValidationState;
+import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.components.validation.ValidationTrigger;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.ConnectableType;
@@ -139,7 +140,7 @@
     private final ProcessScheduler processScheduler;
     private long runNanos = 0L;
     private volatile long yieldNanos;
-    private volatile ScheduledState desiredState;
+    private volatile ScheduledState desiredState = ScheduledState.STOPPED;
     private volatile LogLevel bulletinLevel = LogLevel.WARN;
 
     private SchedulingStrategy schedulingStrategy; // guarded by read/write 
lock
@@ -1343,13 +1344,6 @@ public void disable() {
     public void start(final ScheduledExecutorService taskScheduler, final long 
administrativeYieldMillis, final long timeoutMillis, final ProcessContext 
processContext,
             final SchedulingAgentCallback schedulingAgentCallback, final 
boolean failIfStopping) {
 
-        switch (getValidationStatus()) {
-            case INVALID:
-                throw new IllegalStateException("Processor " + this.getName() 
+ " is not in a valid state due to " + this.getValidationErrors());
-            case VALIDATING:
-                throw new IllegalStateException("Processor " + this.getName() 
+ " cannot be started because its validation is still being performed");
-        }
-
         final Processor processor = processorRef.get().getProcessor();
         final ComponentLog procLog = new 
SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
 
@@ -1487,6 +1481,25 @@ private void initiateStart(final 
ScheduledExecutorService taskScheduler, final l
 
         // Create a task to invoke the @OnScheduled annotation of the processor
         final Callable<Void> startupTask = () -> {
+            final ScheduledState currentScheduleState = scheduledState.get();
+            if (currentScheduleState == ScheduledState.STOPPING || 
currentScheduleState == ScheduledState.STOPPED) {
+                LOG.debug("{} is stopped. Will not call @OnScheduled lifecycle 
methods or begin trigger onTrigger() method", StandardProcessorNode.this);
+                schedulingAgentCallback.onTaskComplete();
+                return null;
+            }
+
+            final ValidationStatus validationStatus = getValidationStatus();
+            if (validationStatus != ValidationStatus.VALID) {
+                LOG.debug("Cannot start {} because Processor is currently not 
valid; will try again after 5 seconds", StandardProcessorNode.this);
+
+                // re-initiate the entire process
+                final Runnable initiateStartTask = () -> 
initiateStart(taskScheduler, administrativeYieldMillis, timeoutMilis, 
processContext, schedulingAgentCallback);
+                taskScheduler.schedule(initiateStartTask, 5, TimeUnit.SECONDS);
+
+                schedulingAgentCallback.onTaskComplete();
+                return null;
+            }
+
             LOG.debug("Invoking @OnScheduled methods of {}", processor);
 
             // Now that the task has been scheduled, set the timeout
@@ -1696,6 +1709,10 @@ public void run() {
         return future;
     }
 
+    @Override
+    public ScheduledState getDesiredState() {
+        return desiredState;
+    }
 
     private void monitorAsyncTask(final Future<?> taskFuture, final Future<?> 
monitoringFuture, final long completionTimestamp) {
         if (taskFuture.isDone()) {
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
index bce85f86e3..f1b585e5f4 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
@@ -16,16 +16,11 @@
  */
 package org.apache.nifi.controller.reporting;
 
-import java.net.URL;
-import java.util.Collection;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.nifi.annotation.configuration.DefaultSchedule;
 import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.ConfigurableComponent;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.components.validation.ValidationTrigger;
 import org.apache.nifi.controller.AbstractComponentNode;
 import org.apache.nifi.controller.ConfigurationContext;
@@ -51,6 +46,12 @@
 import org.slf4j.LoggerFactory;
 import org.springframework.core.annotation.AnnotationUtils;
 
+import java.net.URL;
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
 public abstract class AbstractReportingTaskNode extends AbstractComponentNode 
implements ReportingTaskNode {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AbstractReportingTaskNode.class);
@@ -176,7 +177,7 @@ public boolean isRunning() {
 
     @Override
     public boolean isValidationNecessary() {
-        return !processScheduler.isScheduled(this);
+        return !processScheduler.isScheduled(this) || getValidationStatus() != 
ValidationStatus.VALID;
     }
 
     @Override
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 2ff3307533..a7d5fd8a1d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -21,6 +21,7 @@
 import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.Funnel;
 import org.apache.nifi.connectable.Port;
@@ -186,13 +187,6 @@ public void schedule(final ReportingTaskNode taskNode) {
             throw new IllegalStateException("Reporting Task " + 
taskNode.getName() + " cannot be started because it has " + activeThreadCount + 
" threads still running");
         }
 
-        switch (taskNode.getValidationStatus()) {
-            case INVALID:
-                throw new IllegalStateException("Reporting Task " + 
taskNode.getName() + " is not in a valid state for the following reasons: " + 
taskNode.getValidationErrors());
-            case VALIDATING:
-                throw new IllegalStateException("Reporting Task " + 
taskNode.getName() + " cannot be scheduled because it is in the process of 
validating its configuration");
-        }
-
         final SchedulingAgent agent = 
getSchedulingAgent(taskNode.getSchedulingStrategy());
         lifecycleState.setScheduled(true);
 
@@ -216,6 +210,13 @@ public void run() {
                             return;
                         }
 
+                        final ValidationStatus validationStatus = 
taskNode.getValidationStatus();
+                        if (validationStatus != ValidationStatus.VALID) {
+                            LOG.debug("Cannot schedule {} to run because it is 
currently invalid. Will try again in 5 seconds", taskNode);
+                            componentLifeCycleThreadPool.schedule(this, 5, 
TimeUnit.SECONDS);
+                            return;
+                        }
+
                         try (final NarCloseable x = 
NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), 
reportingTask.getClass(), reportingTask.getIdentifier())) {
                             
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, reportingTask, 
taskNode.getConfigurationContext());
                         }
@@ -231,8 +232,11 @@ public void run() {
                             + "ReportingTask and will attempt to schedule it 
again after {}",
                             new Object[]{reportingTask, e.toString(), 
administrativeYieldDuration}, e);
 
-                    
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, 
reportingTask, taskNode.getConfigurationContext());
-                    
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, 
reportingTask, taskNode.getConfigurationContext());
+
+                    try (final NarCloseable x = 
NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), 
reportingTask.getClass(), reportingTask.getIdentifier())) {
+                        
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, 
reportingTask, taskNode.getConfigurationContext());
+                        
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, 
reportingTask, taskNode.getConfigurationContext());
+                    }
 
                     componentLifeCycleThreadPool.schedule(this, 
administrativeYieldMillis, TimeUnit.MILLISECONDS);
                 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java
index 39693b8987..4a6b4206b6 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java
@@ -30,7 +30,7 @@
     public static final ScheduledStateLookup IDENTITY_LOOKUP = new 
ScheduledStateLookup() {
         @Override
         public ScheduledState getScheduledState(final ProcessorNode procNode) {
-            return procNode.getScheduledState();
+            return procNode.getDesiredState();
         }
 
         @Override
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index 795fc8c7d2..3d6329f8fa 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -16,25 +16,6 @@
  */
 package org.apache.nifi.controller.service;
 
-import java.lang.reflect.InvocationTargetException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.Restricted;
 import org.apache.nifi.annotation.documentation.DeprecationNotice;
@@ -47,8 +28,7 @@
 import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.ConfigurableComponent;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.validation.ValidationState;
+import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.components.validation.ValidationTrigger;
 import org.apache.nifi.controller.AbstractComponentNode;
 import org.apache.nifi.controller.ComponentNode;
@@ -71,6 +51,24 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 public class StandardControllerServiceNode extends AbstractComponentNode 
implements ControllerServiceNode {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(StandardControllerServiceNode.class);
@@ -319,14 +317,6 @@ public void verifyCanEnable() {
         if (getState() != ControllerServiceState.DISABLED) {
             throw new 
IllegalStateException(getControllerServiceImplementation().getIdentifier() + " 
cannot be enabled because it is not disabled");
         }
-
-        final ValidationState validationState = getValidationState();
-        switch (validationState.getStatus()) {
-            case INVALID:
-                throw new 
IllegalStateException(getControllerServiceImplementation().getIdentifier() + " 
cannot be enabled because it is not valid: " + 
validationState.getValidationErrors());
-            case VALIDATING:
-                throw new 
IllegalStateException(getControllerServiceImplementation().getIdentifier() + " 
cannot be enabled because its validation has not yet completed");
-        }
     }
 
     @Override
@@ -334,11 +324,6 @@ public void verifyCanEnable(final 
Set<ControllerServiceNode> ignoredReferences)
         if (getState() != ControllerServiceState.DISABLED) {
             throw new 
IllegalStateException(getControllerServiceImplementation().getIdentifier() + " 
cannot be enabled because it is not disabled");
         }
-
-        final Collection<ValidationResult> validationErrors = 
getValidationErrors(ignoredReferences);
-        if (ignoredReferences != null && !validationErrors.isEmpty()) {
-            throw new IllegalStateException("Controller Service with ID " + 
getIdentifier() + " cannot be enabled because it is not currently valid: " + 
validationErrors);
-        }
     }
 
     @Override
@@ -389,8 +374,11 @@ public boolean isValidationNecessary() {
             case DISABLED:
             case DISABLING:
                 return true;
-            case ENABLED:
             case ENABLING:
+                // If enabling and currently not valid, then we must trigger 
validation to occur. This allows the #enable method
+                // to continue running in the background and complete enabling 
when the service becomes valid.
+                return getValidationStatus() != ValidationStatus.VALID;
+            case ENABLED:
             default:
                 return false;
         }
@@ -398,7 +386,7 @@ public boolean isValidationNecessary() {
 
     /**
      * Will atomically enable this service by invoking its @OnEnabled 
operation.
-     * It uses CAS operation on {@link #stateRef} to transition this service
+     * It uses CAS operation on {@link #stateTransition} to transition this 
service
      * from DISABLED to ENABLING state. If such transition succeeds the service
      * will be marked as 'active' (see {@link 
ControllerServiceNode#isActive()}).
      * If such transition doesn't succeed then no enabling logic will be
@@ -429,6 +417,20 @@ public boolean isValidationNecessary() {
             scheduler.execute(new Runnable() {
                 @Override
                 public void run() {
+                    if (!isActive()) {
+                        LOG.debug("{} is no longer active so will not attempt 
to enable it", StandardControllerServiceNode.this);
+                        stateTransition.disable();
+                        return;
+                    }
+
+                    final ValidationStatus validationStatus = 
getValidationStatus();
+                    if (validationStatus != ValidationStatus.VALID) {
+                        LOG.debug("Cannot enable {} because it is not 
currently valid. Will try again in 5 seconds", 
StandardControllerServiceNode.this);
+                        scheduler.schedule(this, 5, TimeUnit.SECONDS);
+                        future.completeExceptionally(new RuntimeException(this 
+ " cannot be enabled because it is not currently valid. Will try again in 5 
seconds."));
+                        return;
+                    }
+
                     try {
                         try (final NarCloseable nc = 
NarCloseable.withComponentNarLoader(getExtensionManager(), 
getControllerServiceImplementation().getClass(), getIdentifier())) {
                             
ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, 
getControllerServiceImplementation(), configContext);
@@ -446,7 +448,7 @@ public void run() {
                             invokeDisable(configContext);
                             stateTransition.disable();
                         } else {
-                            LOG.debug("Successfully enabled {}", service);
+                            LOG.info("Successfully enabled {}", service);
                         }
                     } catch (Exception e) {
                         future.completeExceptionally(e);
@@ -478,7 +480,7 @@ public void run() {
 
     /**
      * Will atomically disable this service by invoking its @OnDisabled 
operation.
-     * It uses CAS operation on {@link #stateRef} to transition this service
+     * It uses CAS operation on {@link #stateTransition} to transition this 
service
      * from ENABLED to DISABLING state. If such transition succeeds the service
      * will be de-activated (see {@link ControllerServiceNode#isActive()}).
      * If such transition doesn't succeed (the service is still in ENABLING 
state)
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
index 920999e4d6..242c0ada87 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
@@ -24,6 +24,7 @@
 import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.components.validation.ValidationTrigger;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
@@ -92,6 +93,7 @@
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -342,6 +344,8 @@ public void validateServiceEnablementLogicHappensOnlyOnce() 
throws Exception {
         final ControllerServiceNode serviceNode = 
flowManager.createControllerService(SimpleTestService.class.getName(),
                 "1", systemBundle.getBundleDetails().getCoordinate(), null, 
false, true);
 
+        serviceNode.performValidation();
+
         assertFalse(serviceNode.isActive());
         final SimpleTestService ts = (SimpleTestService) 
serviceNode.getControllerServiceImplementation();
         final ExecutorService executor = Executors.newCachedThreadPool();
@@ -361,10 +365,10 @@ public void run() {
                 }
             });
         }
-        // need to sleep a while since we are emulating async invocations on
-        // method that is also internally async
-        Thread.sleep(500);
+
         executor.shutdown();
+        executor.awaitTermination(10, TimeUnit.SECONDS);
+
         assertFalse(asyncFailed.get());
         assertEquals(1, ts.enableInvocationCount());
     }
@@ -399,10 +403,9 @@ public void run() {
                 }
             });
         }
-        // need to sleep a while since we are emulating async invocations on
-        // method that is also internally async
-        Thread.sleep(500);
+
         executor.shutdown();
+        executor.awaitTermination(10, TimeUnit.SECONDS);
         assertFalse(asyncFailed.get());
         assertEquals(0, ts.disableInvocationCount());
     }
@@ -419,8 +422,10 @@ public void validateEnabledServiceCanOnlyBeDisabledOnce() 
throws Exception {
         final ControllerServiceNode serviceNode = 
flowManager.createControllerService(SimpleTestService.class.getName(),
                 "1", systemBundle.getBundleDetails().getCoordinate(), null, 
false, true);
 
+        assertSame(ValidationStatus.VALID, serviceNode.performValidation());
+
         final SimpleTestService ts = (SimpleTestService) 
serviceNode.getControllerServiceImplementation();
-        scheduler.enableControllerService(serviceNode);
+        scheduler.enableControllerService(serviceNode).get();
         assertTrue(serviceNode.isActive());
         final ExecutorService executor = Executors.newCachedThreadPool();
 
@@ -441,8 +446,8 @@ public void run() {
         }
         // need to sleep a while since we are emulating async invocations on
         // method that is also internally async
-        Thread.sleep(500);
         executor.shutdown();
+        executor.awaitTermination(10, TimeUnit.SECONDS); // change to seconds.
         assertFalse(asyncFailed.get());
         assertEquals(1, ts.disableInvocationCount());
     }
@@ -453,9 +458,17 @@ public void validateDisablingOfTheFailedService() throws 
Exception {
 
         final ControllerServiceNode serviceNode = 
flowManager.createControllerService(FailingService.class.getName(),
                 "1", systemBundle.getBundleDetails().getCoordinate(), null, 
false, true);
-        scheduler.enableControllerService(serviceNode);
-        Thread.sleep(1000);
+        serviceNode.performValidation();
+
+        final Future<?> future = 
scheduler.enableControllerService(serviceNode);
+        try {
+            future.get();
+        } catch (final Exception e) {
+            // Expected behavior because the FailingService throws Exception 
when attempting to enable
+        }
+
         scheduler.shutdown();
+
         /*
          * Because it was never disabled it will remain active since its
          * enabling is being retried. This may actually be a bug in the
@@ -528,14 +541,20 @@ public void 
validateNeverEnablingServiceCanStillBeDisabled() throws Exception {
 
         final ControllerServiceNode serviceNode = 
flowManager.createControllerService(LongEnablingService.class.getName(),
                 "1", systemBundle.getBundleDetails().getCoordinate(), null, 
false, true);
+
         final LongEnablingService ts = (LongEnablingService) 
serviceNode.getControllerServiceImplementation();
         ts.setLimit(Long.MAX_VALUE);
+
+        serviceNode.performValidation();
         scheduler.enableControllerService(serviceNode);
-        Thread.sleep(100);
+
         assertTrue(serviceNode.isActive());
+        final long maxTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
+        while (ts.enableInvocationCount() != 1 && System.nanoTime() <= 
maxTime) {
+            Thread.sleep(1L);
+        }
         assertEquals(1, ts.enableInvocationCount());
 
-        Thread.sleep(1000);
         scheduler.disableControllerService(serviceNode);
         assertFalse(serviceNode.isActive());
         assertEquals(ControllerServiceState.DISABLING, serviceNode.getState());
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
index daee3c8e57..ebeb916b02 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
@@ -21,6 +21,7 @@
 import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.components.validation.ValidationTrigger;
 import org.apache.nifi.controller.ExtensionBuilder;
 import org.apache.nifi.controller.FlowController;
@@ -73,6 +74,7 @@
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
 public class TestStandardControllerServiceProvider {
@@ -197,8 +199,8 @@ public void testDisableControllerService() {
         provider.disableControllerService(serviceNode);
     }
 
-    @Test(timeout = 1000000)
-    public void testEnableDisableWithReference() {
+    @Test(timeout = 10000)
+    public void testEnableDisableWithReference() throws InterruptedException {
         final ProcessGroup group = new MockProcessGroup(controller);
         final FlowController controller = Mockito.mock(FlowController.class);
         final FlowManager flowManager = Mockito.mock(FlowManager.class);
@@ -221,17 +223,24 @@ public void testEnableDisableWithReference() {
 
         try {
             provider.enableControllerService(serviceNodeA);
-            Assert.fail("Was able to enable Service A but Service B is 
disabled.");
         } catch (final IllegalStateException expected) {
         }
 
+        assertSame(ControllerServiceState.ENABLING, serviceNodeA.getState());
+
         serviceNodeB.performValidation();
-        serviceNodeB.getValidationStatus(5, TimeUnit.SECONDS);
+        assertSame(ValidationStatus.VALID, serviceNodeB.getValidationStatus(5, 
TimeUnit.SECONDS));
         provider.enableControllerService(serviceNodeB);
 
         serviceNodeA.performValidation();
-        serviceNodeA.getValidationStatus(5, TimeUnit.SECONDS);
-        provider.enableControllerService(serviceNodeA);
+        assertSame(ValidationStatus.VALID, serviceNodeA.getValidationStatus(5, 
TimeUnit.SECONDS));
+
+        final long maxTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
+        // Wait for Service A to become ENABLED. This will happen in a 
background thread after approximately 5 seconds, now that Service A is valid.
+        while (serviceNodeA.getState() != ControllerServiceState.ENABLED && 
System.nanoTime() <= maxTime) {
+            Thread.sleep(5L);
+        }
+        assertSame(ControllerServiceState.ENABLED, serviceNodeA.getState());
 
         try {
             provider.disableControllerService(serviceNodeB);


With regards,
Apache Git Services

Reply via email to