diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index 2e2c8f75cb..e76d432d0d 100755
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -59,6 +59,7 @@
import java.sql.Timestamp;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
+import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
@@ -167,7 +168,7 @@ private static Schema buildAvroSchema(final DataType
dataType, final String fiel
for (final DataType option : options) {
final Schema optionSchema = buildAvroSchema(option,
fieldName, false);
- if (!typesAdded.contains(optionSchema.getType())) {
+ if (optionSchema != null &&
!typesAdded.contains(optionSchema.getType())) {
unionTypes.add(optionSchema);
typesAdded.add(optionSchema.getType());
}
@@ -406,7 +407,7 @@ public static RecordSchema createSchema(final Schema
avroSchema, final String sc
for (final Field field : avroSchema.getFields()) {
final String fieldName = field.name();
final Schema fieldSchema = field.schema();
- final DataType dataType =
AvroTypeUtil.determineDataType(fieldSchema, knownRecords);
+ final DataType dataType = determineDataType(fieldSchema,
knownRecords);
final boolean nullable = isNullable(fieldSchema);
addFieldToList(recordFields, field, fieldName, fieldSchema,
dataType, nullable);
}
@@ -547,7 +548,7 @@ private static void addFieldToList(final List<RecordField>
recordFields, final F
}
private static Long getLongFromTimestamp(final Object rawValue, final
Schema fieldSchema, final String fieldName) {
- final String format =
AvroTypeUtil.determineDataType(fieldSchema).getFormat();
+ final String format = determineDataType(fieldSchema).getFormat();
Timestamp t = DataTypeUtils.toTimestamp(rawValue, () ->
DataTypeUtils.getDateFormat(format), fieldName);
return t.getTime();
}
@@ -566,13 +567,13 @@ private static Object convertToAvroObject(final Object
rawValue, final Schema fi
}
if (LOGICAL_TYPE_DATE.equals(logicalType.getName())) {
- final String format =
AvroTypeUtil.determineDataType(fieldSchema).getFormat();
+ final String format =
determineDataType(fieldSchema).getFormat();
final Date date = DataTypeUtils.toDate(rawValue, () ->
DataTypeUtils.getDateFormat(format), fieldName);
final Duration duration = Duration.between(new
Date(0L).toInstant(), new Date(date.getTime()).toInstant());
final long days = duration.toDays();
return (int) days;
} else if
(LOGICAL_TYPE_TIME_MILLIS.equals(logicalType.getName())) {
- final String format =
AvroTypeUtil.determineDataType(fieldSchema).getFormat();
+ final String format =
determineDataType(fieldSchema).getFormat();
final Time time = DataTypeUtils.toTime(rawValue, () ->
DataTypeUtils.getDateFormat(format), fieldName);
final Date date = new Date(time.getTime());
final Duration duration =
Duration.between(date.toInstant().truncatedTo(ChronoUnit.DAYS),
date.toInstant());
@@ -594,7 +595,7 @@ private static Object convertToAvroObject(final Object
rawValue, final Schema fi
final Duration duration =
Duration.between(date.toInstant().truncatedTo(ChronoUnit.DAYS),
date.toInstant());
return duration.toMillis() * 1000L;
} else if
(LOGICAL_TYPE_TIMESTAMP_MILLIS.equals(logicalType.getName())) {
- final String format =
AvroTypeUtil.determineDataType(fieldSchema).getFormat();
+ final String format =
determineDataType(fieldSchema).getFormat();
Timestamp t = DataTypeUtils.toTimestamp(rawValue, () ->
DataTypeUtils.getDateFormat(format), fieldName);
return getLongFromTimestamp(rawValue, fieldSchema,
fieldName);
} else if
(LOGICAL_TYPE_TIMESTAMP_MICROS.equals(logicalType.getName())) {
@@ -642,7 +643,7 @@ private static Object convertToAvroObject(final Object
rawValue, final Schema fi
return ByteBuffer.wrap(((String)
rawValue).getBytes(charset));
}
if (rawValue instanceof Object[]) {
- return AvroTypeUtil.convertByteArray((Object[]) rawValue);
+ return convertByteArray((Object[]) rawValue);
} else {
throw new IllegalTypeConversionException("Cannot convert
value " + rawValue + " of type " + rawValue.getClass() + " to a ByteBuffer");
}
@@ -672,24 +673,38 @@ private static Object convertToAvroObject(final Object
rawValue, final Schema fi
case RECORD:
final GenericData.Record avroRecord = new
GenericData.Record(fieldSchema);
- final Record record = (Record) rawValue;
- for (final RecordField recordField :
record.getSchema().getFields()) {
- final Object recordFieldValue =
record.getValue(recordField);
- final String recordFieldName = recordField.getFieldName();
-
- final Field field = fieldSchema.getField(recordFieldName);
+ final Set<Map.Entry<String, Object>> entries;
+ if (rawValue instanceof Map) {
+ final Map<String, Object> map = (Map<String, Object>)
rawValue;
+ entries = map.entrySet();
+ } else if (rawValue instanceof Record) {
+ entries = new HashSet<>();
+ final Record record = (Record) rawValue;
+ record.getSchema().getFields().forEach(field ->
entries.add(new AbstractMap.SimpleEntry<>(field.getFieldName(),
record.getValue(field))));
+ } else {
+ throw new IllegalTypeConversionException("Cannot convert
value " + rawValue + " of type " + rawValue.getClass() + " to a Record");
+ }
+ for (final Map.Entry<String, Object> e : entries) {
+ final Field field = fieldSchema.getField(e.getKey());
if (field == null) {
continue;
}
- final Object converted =
convertToAvroObject(recordFieldValue, field.schema(), fieldName + "/" +
recordFieldName, charset);
- avroRecord.put(recordFieldName, converted);
+ final Object converted = convertToAvroObject(e.getValue(),
field.schema(), fieldName + "/" + e.getKey(), charset);
+ avroRecord.put(e.getKey(), converted);
}
return avroRecord;
case UNION:
return convertUnionFieldValue(rawValue, fieldSchema, schema ->
convertToAvroObject(rawValue, schema, fieldName, charset), fieldName);
case ARRAY:
- final Object[] objectArray = (Object[]) rawValue;
+ final Object[] objectArray;
+ if (rawValue instanceof List) {
+ objectArray = ((List) rawValue).toArray();
+ } else if (rawValue instanceof Object[]) {
+ objectArray = (Object[]) rawValue;
+ } else {
+ throw new IllegalTypeConversionException("Cannot convert
value " + rawValue + " of type " + rawValue.getClass() + " to an Array");
+ }
final List<Object> list = new ArrayList<>(objectArray.length);
int i = 0;
for (final Object o : objectArray) {
@@ -773,7 +788,7 @@ private static Object convertUnionFieldValue(final Object
originalValue, final S
}
foundNonNull = true;
- final DataType desiredDataType =
AvroTypeUtil.determineDataType(subSchema);
+ final DataType desiredDataType = determineDataType(subSchema);
try {
final Object convertedValue = conversion.apply(subSchema);
@@ -891,7 +906,7 @@ private static Object normalizeValue(final Object value,
final Schema avroSchema
final Object fieldValue = normalizeValue(avroFieldValue,
field.schema(), fieldName + "/" + field.name());
values.put(field.name(), fieldValue);
}
- final RecordSchema childSchema =
AvroTypeUtil.createSchema(recordSchema, false);
+ final RecordSchema childSchema = createSchema(recordSchema,
false);
return new MapRecord(childSchema, values);
case BYTES:
final ByteBuffer bb = (ByteBuffer) value;
@@ -899,10 +914,10 @@ private static Object normalizeValue(final Object value,
final Schema avroSchema
if (logicalType != null &&
LOGICAL_TYPE_DECIMAL.equals(logicalType.getName())) {
return new Conversions.DecimalConversion().fromBytes(bb,
avroSchema, logicalType);
}
- return AvroTypeUtil.convertByteArray(bb.array());
+ return convertByteArray(bb.array());
case FIXED:
final GenericFixed fixed = (GenericFixed) value;
- return AvroTypeUtil.convertByteArray(fixed.bytes());
+ return convertByteArray(fixed.bytes());
case ENUM:
return value.toString();
case NULL:
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
index 1f6c29bfb3..a68e5bcd52 100755
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
@@ -450,4 +450,21 @@ public void testBytesToStringConversion() {
assertTrue(o instanceof String);
assertEquals("Hello", o);
}
+ @Test
+ public void testListToArrayConversion() {
+ final Charset charset = Charset.forName("UTF_32LE");
+ Object o =
AvroTypeUtil.convertToAvroObject(Collections.singletonList("Hello"),
Schema.createArray(Schema.create(Type.STRING)), charset);
+ assertTrue(o instanceof List);
+ assertEquals(1, ((List) o).size());
+ assertEquals("Hello", ((List) o).get(0));
+ }
+
+ @Test
+ public void testMapToRecordConversion() {
+ final Charset charset = Charset.forName("UTF_32LE");
+ Object o =
AvroTypeUtil.convertToAvroObject(Collections.singletonMap("Hello", "World"),
+ Schema.createRecord(Collections.singletonList(new
Field("Hello", Schema.create(Type.STRING), "", ""))), charset);
+ assertTrue(o instanceof Record);
+ assertEquals("World", ((Record) o).get("Hello"));
+ }
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
index f3ae41fc1e..e17682eeed 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
@@ -373,9 +373,8 @@ public String toString() {
}
@Override
- public final void performValidation() {
- boolean replaced = false;
- do {
+ public final ValidationStatus performValidation() {
+ while (true) {
final ValidationState validationState = getValidationState();
final ValidationContext validationContext = getValidationContext();
@@ -391,8 +390,11 @@ public final void performValidation() {
final ValidationStatus status = results.isEmpty() ?
ValidationStatus.VALID : ValidationStatus.INVALID;
final ValidationState updatedState = new ValidationState(status,
results);
- replaced = replaceValidationState(validationState, updatedState);
- } while (!replaced);
+ final boolean replaced = replaceValidationState(validationState,
updatedState);
+ if (replaced) {
+ return status;
+ }
+ }
}
protected Collection<ValidationResult> computeValidationErrors(final
ValidationContext validationContext) {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
index d0ed572515..2357d41cd0 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
@@ -179,7 +179,7 @@ public default void setProperties(Map<String, String>
properties) {
/**
* Asynchronously begins the validation process
*/
- public abstract void performValidation();
+ public abstract ValidationStatus performValidation();
/**
* Returns a {@link List} of all {@link PropertyDescriptor}s that this
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
index 6e8206eb71..12eeb88f23 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -17,6 +17,7 @@
package org.apache.nifi.controller;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.components.validation.ValidationTrigger;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.scheduling.LifecycleState;
@@ -144,7 +145,13 @@ public ProcessorNode(final String id,
public ScheduledState getScheduledState() {
ScheduledState sc = this.scheduledState.get();
if (sc == ScheduledState.STARTING) {
- return ScheduledState.RUNNING;
+ final ValidationStatus validationStatus = getValidationStatus();
+
+ if (validationStatus == ValidationStatus.INVALID) {
+ return ScheduledState.STOPPED;
+ } else {
+ return ScheduledState.RUNNING;
+ }
} else if (sc == ScheduledState.STOPPING) {
return ScheduledState.STOPPED;
}
@@ -240,4 +247,12 @@ public abstract void start(ScheduledExecutorService
scheduler, long administrati
* will result in the WARN message if processor can not be enabled.
*/
public abstract void disable();
+
+ /**
+ * Returns the Scheduled State that is desired for this Processor. This
may vary from the current state if the Processor is not
+ * currently valid, is in the process of stopping but should then
transition to Running, etc.
+ *
+ * @return the desired state for this Processor
+ */
+ public abstract ScheduledState getDesiredState();
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index dad01b8a24..8ab7e69d54 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -912,7 +912,6 @@ public void trigger(final ComponentNode component) {
try {
if (connectable instanceof ProcessorNode) {
- ((ProcessorNode)
connectable).getValidationStatus(5, TimeUnit.SECONDS);
connectable.getProcessGroup().startProcessor((ProcessorNode) connectable, true);
} else {
startConnectable(connectable);
@@ -1242,7 +1241,7 @@ public ScheduledState getScheduledState(final
ProcessorNode procNode) {
return ScheduledState.RUNNING;
}
- return procNode.getScheduledState();
+ return procNode.getDesiredState();
}
@Override
@@ -1699,7 +1698,6 @@ public void startReportingTask(final ReportingTaskNode
reportingTaskNode) {
throw new IllegalStateException("Cannot start reporting task " +
reportingTaskNode.getIdentifier() + " because the controller is terminated");
}
- reportingTaskNode.performValidation(); // ensure that the reporting
task has completed its validation before attempting to start it
reportingTaskNode.verifyCanStart();
reportingTaskNode.reloadAdditionalResourcesIfNecessary();
processScheduler.schedule(reportingTaskNode);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 4adfad49b9..8ca51731e2 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -38,6 +38,7 @@
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.validation.ValidationState;
+import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.components.validation.ValidationTrigger;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
@@ -139,7 +140,7 @@
private final ProcessScheduler processScheduler;
private long runNanos = 0L;
private volatile long yieldNanos;
- private volatile ScheduledState desiredState;
+ private volatile ScheduledState desiredState = ScheduledState.STOPPED;
private volatile LogLevel bulletinLevel = LogLevel.WARN;
private SchedulingStrategy schedulingStrategy; // guarded by read/write
lock
@@ -1343,13 +1344,6 @@ public void disable() {
public void start(final ScheduledExecutorService taskScheduler, final long
administrativeYieldMillis, final long timeoutMillis, final ProcessContext
processContext,
final SchedulingAgentCallback schedulingAgentCallback, final
boolean failIfStopping) {
- switch (getValidationStatus()) {
- case INVALID:
- throw new IllegalStateException("Processor " + this.getName()
+ " is not in a valid state due to " + this.getValidationErrors());
- case VALIDATING:
- throw new IllegalStateException("Processor " + this.getName()
+ " cannot be started because its validation is still being performed");
- }
-
final Processor processor = processorRef.get().getProcessor();
final ComponentLog procLog = new
SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
@@ -1487,6 +1481,25 @@ private void initiateStart(final
ScheduledExecutorService taskScheduler, final l
// Create a task to invoke the @OnScheduled annotation of the processor
final Callable<Void> startupTask = () -> {
+ final ScheduledState currentScheduleState = scheduledState.get();
+ if (currentScheduleState == ScheduledState.STOPPING ||
currentScheduleState == ScheduledState.STOPPED) {
+ LOG.debug("{} is stopped. Will not call @OnScheduled lifecycle
methods or begin trigger onTrigger() method", StandardProcessorNode.this);
+ schedulingAgentCallback.onTaskComplete();
+ return null;
+ }
+
+ final ValidationStatus validationStatus = getValidationStatus();
+ if (validationStatus != ValidationStatus.VALID) {
+ LOG.debug("Cannot start {} because Processor is currently not
valid; will try again after 5 seconds", StandardProcessorNode.this);
+
+ // re-initiate the entire process
+ final Runnable initiateStartTask = () ->
initiateStart(taskScheduler, administrativeYieldMillis, timeoutMilis,
processContext, schedulingAgentCallback);
+ taskScheduler.schedule(initiateStartTask, 5, TimeUnit.SECONDS);
+
+ schedulingAgentCallback.onTaskComplete();
+ return null;
+ }
+
LOG.debug("Invoking @OnScheduled methods of {}", processor);
// Now that the task has been scheduled, set the timeout
@@ -1696,6 +1709,10 @@ public void run() {
return future;
}
+ @Override
+ public ScheduledState getDesiredState() {
+ return desiredState;
+ }
private void monitorAsyncTask(final Future<?> taskFuture, final Future<?>
monitoringFuture, final long completionTimestamp) {
if (taskFuture.isDone()) {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
index bce85f86e3..f1b585e5f4 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
@@ -16,16 +16,11 @@
*/
package org.apache.nifi.controller.reporting;
-import java.net.URL;
-import java.util.Collection;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.components.validation.ValidationTrigger;
import org.apache.nifi.controller.AbstractComponentNode;
import org.apache.nifi.controller.ConfigurationContext;
@@ -51,6 +46,12 @@
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.AnnotationUtils;
+import java.net.URL;
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
public abstract class AbstractReportingTaskNode extends AbstractComponentNode
implements ReportingTaskNode {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractReportingTaskNode.class);
@@ -176,7 +177,7 @@ public boolean isRunning() {
@Override
public boolean isValidationNecessary() {
- return !processScheduler.isScheduled(this);
+ return !processScheduler.isScheduled(this) || getValidationStatus() !=
ValidationStatus.VALID;
}
@Override
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 2ff3307533..a7d5fd8a1d 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -21,6 +21,7 @@
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
@@ -186,13 +187,6 @@ public void schedule(final ReportingTaskNode taskNode) {
throw new IllegalStateException("Reporting Task " +
taskNode.getName() + " cannot be started because it has " + activeThreadCount +
" threads still running");
}
- switch (taskNode.getValidationStatus()) {
- case INVALID:
- throw new IllegalStateException("Reporting Task " +
taskNode.getName() + " is not in a valid state for the following reasons: " +
taskNode.getValidationErrors());
- case VALIDATING:
- throw new IllegalStateException("Reporting Task " +
taskNode.getName() + " cannot be scheduled because it is in the process of
validating its configuration");
- }
-
final SchedulingAgent agent =
getSchedulingAgent(taskNode.getSchedulingStrategy());
lifecycleState.setScheduled(true);
@@ -216,6 +210,13 @@ public void run() {
return;
}
+ final ValidationStatus validationStatus =
taskNode.getValidationStatus();
+ if (validationStatus != ValidationStatus.VALID) {
+ LOG.debug("Cannot schedule {} to run because it is
currently invalid. Will try again in 5 seconds", taskNode);
+ componentLifeCycleThreadPool.schedule(this, 5,
TimeUnit.SECONDS);
+ return;
+ }
+
try (final NarCloseable x =
NarCloseable.withComponentNarLoader(flowController.getExtensionManager(),
reportingTask.getClass(), reportingTask.getIdentifier())) {
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, reportingTask,
taskNode.getConfigurationContext());
}
@@ -231,8 +232,11 @@ public void run() {
+ "ReportingTask and will attempt to schedule it
again after {}",
new Object[]{reportingTask, e.toString(),
administrativeYieldDuration}, e);
-
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class,
reportingTask, taskNode.getConfigurationContext());
-
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class,
reportingTask, taskNode.getConfigurationContext());
+
+ try (final NarCloseable x =
NarCloseable.withComponentNarLoader(flowController.getExtensionManager(),
reportingTask.getClass(), reportingTask.getIdentifier())) {
+
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class,
reportingTask, taskNode.getConfigurationContext());
+
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class,
reportingTask, taskNode.getConfigurationContext());
+ }
componentLifeCycleThreadPool.schedule(this,
administrativeYieldMillis, TimeUnit.MILLISECONDS);
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java
index 39693b8987..4a6b4206b6 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java
@@ -30,7 +30,7 @@
public static final ScheduledStateLookup IDENTITY_LOOKUP = new
ScheduledStateLookup() {
@Override
public ScheduledState getScheduledState(final ProcessorNode procNode) {
- return procNode.getScheduledState();
+ return procNode.getDesiredState();
}
@Override
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index 795fc8c7d2..3d6329f8fa 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -16,25 +16,6 @@
*/
package org.apache.nifi.controller.service;
-import java.lang.reflect.InvocationTargetException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
@@ -47,8 +28,7 @@
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.validation.ValidationState;
+import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.components.validation.ValidationTrigger;
import org.apache.nifi.controller.AbstractComponentNode;
import org.apache.nifi.controller.ComponentNode;
@@ -71,6 +51,24 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
public class StandardControllerServiceNode extends AbstractComponentNode
implements ControllerServiceNode {
private static final Logger LOG =
LoggerFactory.getLogger(StandardControllerServiceNode.class);
@@ -319,14 +317,6 @@ public void verifyCanEnable() {
if (getState() != ControllerServiceState.DISABLED) {
throw new
IllegalStateException(getControllerServiceImplementation().getIdentifier() + "
cannot be enabled because it is not disabled");
}
-
- final ValidationState validationState = getValidationState();
- switch (validationState.getStatus()) {
- case INVALID:
- throw new
IllegalStateException(getControllerServiceImplementation().getIdentifier() + "
cannot be enabled because it is not valid: " +
validationState.getValidationErrors());
- case VALIDATING:
- throw new
IllegalStateException(getControllerServiceImplementation().getIdentifier() + "
cannot be enabled because its validation has not yet completed");
- }
}
@Override
@@ -334,11 +324,6 @@ public void verifyCanEnable(final
Set<ControllerServiceNode> ignoredReferences)
if (getState() != ControllerServiceState.DISABLED) {
throw new
IllegalStateException(getControllerServiceImplementation().getIdentifier() + "
cannot be enabled because it is not disabled");
}
-
- final Collection<ValidationResult> validationErrors =
getValidationErrors(ignoredReferences);
- if (ignoredReferences != null && !validationErrors.isEmpty()) {
- throw new IllegalStateException("Controller Service with ID " +
getIdentifier() + " cannot be enabled because it is not currently valid: " +
validationErrors);
- }
}
@Override
@@ -389,8 +374,11 @@ public boolean isValidationNecessary() {
case DISABLED:
case DISABLING:
return true;
- case ENABLED:
case ENABLING:
+ // If enabling and currently not valid, then we must trigger
validation to occur. This allows the #enable method
+ // to continue running in the background and complete enabling
when the service becomes valid.
+ return getValidationStatus() != ValidationStatus.VALID;
+ case ENABLED:
default:
return false;
}
@@ -398,7 +386,7 @@ public boolean isValidationNecessary() {
/**
* Will atomically enable this service by invoking its @OnEnabled
operation.
- * It uses CAS operation on {@link #stateRef} to transition this service
+ * It uses CAS operation on {@link #stateTransition} to transition this
service
* from DISABLED to ENABLING state. If such transition succeeds the service
* will be marked as 'active' (see {@link
ControllerServiceNode#isActive()}).
* If such transition doesn't succeed then no enabling logic will be
@@ -429,6 +417,20 @@ public boolean isValidationNecessary() {
scheduler.execute(new Runnable() {
@Override
public void run() {
+ if (!isActive()) {
+ LOG.debug("{} is no longer active so will not attempt
to enable it", StandardControllerServiceNode.this);
+ stateTransition.disable();
+ return;
+ }
+
+ final ValidationStatus validationStatus =
getValidationStatus();
+ if (validationStatus != ValidationStatus.VALID) {
+ LOG.debug("Cannot enable {} because it is not
currently valid. Will try again in 5 seconds",
StandardControllerServiceNode.this);
+ scheduler.schedule(this, 5, TimeUnit.SECONDS);
+ future.completeExceptionally(new RuntimeException(this
+ " cannot be enabled because it is not currently valid. Will try again in 5
seconds."));
+ return;
+ }
+
try {
try (final NarCloseable nc =
NarCloseable.withComponentNarLoader(getExtensionManager(),
getControllerServiceImplementation().getClass(), getIdentifier())) {
ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class,
getControllerServiceImplementation(), configContext);
@@ -446,7 +448,7 @@ public void run() {
invokeDisable(configContext);
stateTransition.disable();
} else {
- LOG.debug("Successfully enabled {}", service);
+ LOG.info("Successfully enabled {}", service);
}
} catch (Exception e) {
future.completeExceptionally(e);
@@ -478,7 +480,7 @@ public void run() {
/**
* Will atomically disable this service by invoking its @OnDisabled
operation.
- * It uses CAS operation on {@link #stateRef} to transition this service
+ * It uses CAS operation on {@link #stateTransition} to transition this
service
* from ENABLED to DISABLING state. If such transition succeeds the service
* will be de-activated (see {@link ControllerServiceNode#isActive()}).
* If such transition doesn't succeed (the service is still in ENABLING
state)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
index 920999e4d6..242c0ada87 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
@@ -24,6 +24,7 @@
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.components.validation.ValidationTrigger;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
@@ -92,6 +93,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -342,6 +344,8 @@ public void validateServiceEnablementLogicHappensOnlyOnce()
throws Exception {
final ControllerServiceNode serviceNode =
flowManager.createControllerService(SimpleTestService.class.getName(),
"1", systemBundle.getBundleDetails().getCoordinate(), null,
false, true);
+ serviceNode.performValidation();
+
assertFalse(serviceNode.isActive());
final SimpleTestService ts = (SimpleTestService)
serviceNode.getControllerServiceImplementation();
final ExecutorService executor = Executors.newCachedThreadPool();
@@ -361,10 +365,10 @@ public void run() {
}
});
}
- // need to sleep a while since we are emulating async invocations on
- // method that is also internally async
- Thread.sleep(500);
+
executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+
assertFalse(asyncFailed.get());
assertEquals(1, ts.enableInvocationCount());
}
@@ -399,10 +403,9 @@ public void run() {
}
});
}
- // need to sleep a while since we are emulating async invocations on
- // method that is also internally async
- Thread.sleep(500);
+
executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
assertFalse(asyncFailed.get());
assertEquals(0, ts.disableInvocationCount());
}
@@ -419,8 +422,10 @@ public void validateEnabledServiceCanOnlyBeDisabledOnce()
throws Exception {
final ControllerServiceNode serviceNode =
flowManager.createControllerService(SimpleTestService.class.getName(),
"1", systemBundle.getBundleDetails().getCoordinate(), null,
false, true);
+ assertSame(ValidationStatus.VALID, serviceNode.performValidation());
+
final SimpleTestService ts = (SimpleTestService)
serviceNode.getControllerServiceImplementation();
- scheduler.enableControllerService(serviceNode);
+ scheduler.enableControllerService(serviceNode).get();
assertTrue(serviceNode.isActive());
final ExecutorService executor = Executors.newCachedThreadPool();
@@ -441,8 +446,8 @@ public void run() {
}
// need to sleep a while since we are emulating async invocations on
// method that is also internally async
- Thread.sleep(500);
executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.SECONDS); // change to seconds.
assertFalse(asyncFailed.get());
assertEquals(1, ts.disableInvocationCount());
}
@@ -453,9 +458,17 @@ public void validateDisablingOfTheFailedService() throws
Exception {
final ControllerServiceNode serviceNode =
flowManager.createControllerService(FailingService.class.getName(),
"1", systemBundle.getBundleDetails().getCoordinate(), null,
false, true);
- scheduler.enableControllerService(serviceNode);
- Thread.sleep(1000);
+ serviceNode.performValidation();
+
+ final Future<?> future =
scheduler.enableControllerService(serviceNode);
+ try {
+ future.get();
+ } catch (final Exception e) {
+ // Expected behavior because the FailingService throws Exception
when attempting to enable
+ }
+
scheduler.shutdown();
+
/*
* Because it was never disabled it will remain active since its
* enabling is being retried. This may actually be a bug in the
@@ -528,14 +541,20 @@ public void
validateNeverEnablingServiceCanStillBeDisabled() throws Exception {
final ControllerServiceNode serviceNode =
flowManager.createControllerService(LongEnablingService.class.getName(),
"1", systemBundle.getBundleDetails().getCoordinate(), null,
false, true);
+
final LongEnablingService ts = (LongEnablingService)
serviceNode.getControllerServiceImplementation();
ts.setLimit(Long.MAX_VALUE);
+
+ serviceNode.performValidation();
scheduler.enableControllerService(serviceNode);
- Thread.sleep(100);
+
assertTrue(serviceNode.isActive());
+ final long maxTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
+ while (ts.enableInvocationCount() != 1 && System.nanoTime() <=
maxTime) {
+ Thread.sleep(1L);
+ }
assertEquals(1, ts.enableInvocationCount());
- Thread.sleep(1000);
scheduler.disableControllerService(serviceNode);
assertFalse(serviceNode.isActive());
assertEquals(ControllerServiceState.DISABLING, serviceNode.getState());
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
index daee3c8e57..ebeb916b02 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
@@ -21,6 +21,7 @@
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.components.validation.ValidationTrigger;
import org.apache.nifi.controller.ExtensionBuilder;
import org.apache.nifi.controller.FlowController;
@@ -73,6 +74,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
public class TestStandardControllerServiceProvider {
@@ -197,8 +199,8 @@ public void testDisableControllerService() {
provider.disableControllerService(serviceNode);
}
- @Test(timeout = 1000000)
- public void testEnableDisableWithReference() {
+ @Test(timeout = 10000)
+ public void testEnableDisableWithReference() throws InterruptedException {
final ProcessGroup group = new MockProcessGroup(controller);
final FlowController controller = Mockito.mock(FlowController.class);
final FlowManager flowManager = Mockito.mock(FlowManager.class);
@@ -221,17 +223,24 @@ public void testEnableDisableWithReference() {
try {
provider.enableControllerService(serviceNodeA);
- Assert.fail("Was able to enable Service A but Service B is
disabled.");
} catch (final IllegalStateException expected) {
}
+ assertSame(ControllerServiceState.ENABLING, serviceNodeA.getState());
+
serviceNodeB.performValidation();
- serviceNodeB.getValidationStatus(5, TimeUnit.SECONDS);
+ assertSame(ValidationStatus.VALID, serviceNodeB.getValidationStatus(5,
TimeUnit.SECONDS));
provider.enableControllerService(serviceNodeB);
serviceNodeA.performValidation();
- serviceNodeA.getValidationStatus(5, TimeUnit.SECONDS);
- provider.enableControllerService(serviceNodeA);
+ assertSame(ValidationStatus.VALID, serviceNodeA.getValidationStatus(5,
TimeUnit.SECONDS));
+
+ final long maxTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
+ // Wait for Service A to become ENABLED. This will happen in a
background thread after approximately 5 seconds, now that Service A is valid.
+ while (serviceNodeA.getState() != ControllerServiceState.ENABLED &&
System.nanoTime() <= maxTime) {
+ Thread.sleep(5L);
+ }
+ assertSame(ControllerServiceState.ENABLED, serviceNodeA.getState());
try {
provider.disableControllerService(serviceNodeB);
With regards,
Apache Git Services