This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new d36ce3ccaa NIFI-15365 Fixed Verification for Temporary Processors and
Controller Services (#10688)
d36ce3ccaa is described below
commit d36ce3ccaa9ecdc8f517e5276d4809fb27377c2e
Author: Bob Paulin <[email protected]>
AuthorDate: Wed Jan 21 22:00:17 2026 -0600
NIFI-15365 Fixed Verification for Temporary Processors and Controller
Services (#10688)
- Use new InstanceClassLoader to load additional classpath urls
- Initialize component using VerifiableComponentFactory to set references
from existing component
- Allow Instance and InstanceClassLoader to be GC and closed by calling
OnRemove
Signed-off-by: David Handermann <[email protected]>
---
.../nifi/controller/StandardProcessorNode.java | 24 ++++--
.../service/StandardControllerServiceNode.java | 26 ++++--
.../validation/VerifiableComponentFactory.java | 48 +++++++++++
.../nifi/controller/AbstractComponentNode.java | 6 +-
.../StandardVerifiableComponentFactory.java | 87 ++++++++++++++++++++
.../apache/nifi/controller/ExtensionBuilder.java | 20 ++++-
.../org/apache/nifi/controller/FlowController.java | 8 ++
.../nifi/controller/flow/StandardFlowManager.java | 2 +
.../scheduling/TestStandardProcessScheduler.java | 16 +++-
.../StandardControllerServiceProviderTest.java | 2 +
.../TestStandardControllerServiceProvider.java | 5 +-
.../nifi/stateless/engine/ComponentBuilder.java | 7 +-
.../stateless/engine/StandardStatelessEngine.java | 8 ++
.../nifi/stateless/engine/StatelessEngine.java | 3 +
.../StatelessVerifiableComponentFactory.java | 92 ++++++++++++++++++++++
15 files changed, 330 insertions(+), 24 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 20f5363744..c219d2c45c 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -25,6 +25,7 @@ import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
+import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
@@ -45,6 +46,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.validation.ValidationState;
import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.components.validation.ValidationTrigger;
+import org.apache.nifi.components.validation.VerifiableComponentFactory;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
@@ -160,6 +162,7 @@ public class StandardProcessorNode extends ProcessorNode
implements Connectable
private final AtomicLong schedulingNanos;
private final AtomicReference<String> versionedComponentId = new
AtomicReference<>();
private final ProcessScheduler processScheduler;
+ private final VerifiableComponentFactory verifiableComponentFactory;
private long runNanos = 0L;
private volatile long yieldNanos;
private volatile ScheduledState desiredState = ScheduledState.STOPPED;
@@ -181,23 +184,25 @@ public class StandardProcessorNode extends ProcessorNode
implements Connectable
public StandardProcessorNode(final LoggableComponent<Processor> processor,
final String uuid,
final ValidationContextFactory
validationContextFactory, final ProcessScheduler scheduler,
final ControllerServiceProvider
controllerServiceProvider, final ReloadComponent reloadComponent,
- final ExtensionManager extensionManager,
final ValidationTrigger validationTrigger) {
+ final VerifiableComponentFactory
verifiableComponentFactory, final ExtensionManager extensionManager,
+ final ValidationTrigger validationTrigger) {
this(processor, uuid, validationContextFactory, scheduler,
controllerServiceProvider, processor.getComponent().getClass().getSimpleName(),
- processor.getComponent().getClass().getCanonicalName(),
reloadComponent, extensionManager, validationTrigger, false);
+ processor.getComponent().getClass().getCanonicalName(),
reloadComponent, verifiableComponentFactory, extensionManager,
validationTrigger, false);
}
public StandardProcessorNode(final LoggableComponent<Processor> processor,
final String uuid,
final ValidationContextFactory
validationContextFactory, final ProcessScheduler scheduler,
final ControllerServiceProvider
controllerServiceProvider, final String componentType, final String
componentCanonicalClass,
- final ReloadComponent reloadComponent, final
ExtensionManager extensionManager, final ValidationTrigger validationTrigger,
- final boolean isExtensionMissing) {
+ final ReloadComponent reloadComponent, final
VerifiableComponentFactory verifiableComponentFactory, final ExtensionManager
extensionManager,
+ final ValidationTrigger validationTrigger,
final boolean isExtensionMissing) {
super(uuid, validationContextFactory, controllerServiceProvider,
componentType, componentCanonicalClass, reloadComponent,
extensionManager, validationTrigger, isExtensionMissing);
final ProcessorDetails processorDetails = new
ProcessorDetails(processor);
this.processorRef = new AtomicReference<>(processorDetails);
+ this.verifiableComponentFactory = verifiableComponentFactory;
identifier = uuid;
destinations = new ConcurrentHashMap<>();
@@ -1002,7 +1007,8 @@ public class StandardProcessorNode extends ProcessorNode
implements Connectable
// Check if the given configuration requires a different
classloader than the current configuration
final boolean classpathDifferent =
isClasspathDifferent(context.getProperties());
- if (classpathDifferent) {
+ if (classpathDifferent ||
isReloadAdditionalResourcesNecessary()) {
+ LOG.debug("Classpath reload required. Create temporary
InstanceClassLoader for verification");
// Create a classloader for the given configuration and
use that to verify the component's configuration
final Bundle bundle =
extensionManager.getBundle(getBundleCoordinate());
final Set<URL> classpathUrls =
getAdditionalClasspathResources(context.getProperties().keySet(), descriptor ->
context.getProperty(descriptor).getValue());
@@ -1013,7 +1019,12 @@ public class StandardProcessorNode extends ProcessorNode
implements Connectable
try (final InstanceClassLoader detectedClassLoader =
extensionManager.createInstanceClassLoader(getComponentType(), getIdentifier(),
bundle, classpathUrls, false,
classloaderIsolationKey)) {
Thread.currentThread().setContextClassLoader(detectedClassLoader);
- results.addAll(verifiable.verify(context, logger,
attributes));
+ final VerifiableProcessor tempVerifiable =
verifiableComponentFactory.createProcessor(this, detectedClassLoader);
+ try {
+ results.addAll(tempVerifiable.verify(context,
logger, attributes));
+ } finally {
+
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class,
tempVerifiable, context);
+ }
} finally {
Thread.currentThread().setContextClassLoader(currentClassLoader);
}
@@ -2167,4 +2178,5 @@ public class StandardProcessorNode extends ProcessorNode
implements Connectable
serviceNode.updateReference(this, descriptor);
}
}
+
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index a530513cf0..a33d64d365 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -21,6 +21,7 @@ import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.authorization.Resource;
@@ -38,6 +39,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.validation.ValidationState;
import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.components.validation.ValidationTrigger;
+import org.apache.nifi.components.validation.VerifiableComponentFactory;
import org.apache.nifi.controller.AbstractComponentNode;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ConfigurationContext;
@@ -117,22 +119,23 @@ public class StandardControllerServiceNode extends
AbstractComponentNode impleme
private volatile String comment;
private volatile ProcessGroup processGroup;
private volatile LogLevel bulletinLevel = LogLevel.WARN;
+ private final VerifiableComponentFactory verifiableComponentFactory;
private final AtomicBoolean active;
public StandardControllerServiceNode(final
LoggableComponent<ControllerService> implementation, final
LoggableComponent<ControllerService> proxiedControllerService,
final
ControllerServiceInvocationHandler invocationHandler, final String id, final
ValidationContextFactory validationContextFactory,
- final ControllerServiceProvider
serviceProvider, final ReloadComponent reloadComponent,
+ final ControllerServiceProvider
serviceProvider, final ReloadComponent reloadComponent, final
VerifiableComponentFactory verifiableComponentFactory,
final ExtensionManager
extensionManager, final ValidationTrigger validationTrigger) {
this(implementation, proxiedControllerService, invocationHandler, id,
validationContextFactory, serviceProvider,
implementation.getComponent().getClass().getSimpleName(),
- implementation.getComponent().getClass().getCanonicalName(),
reloadComponent, extensionManager, validationTrigger, false);
+ implementation.getComponent().getClass().getCanonicalName(),
reloadComponent, verifiableComponentFactory, extensionManager,
validationTrigger, false);
}
public StandardControllerServiceNode(final
LoggableComponent<ControllerService> implementation, final
LoggableComponent<ControllerService> proxiedControllerService,
final
ControllerServiceInvocationHandler invocationHandler, final String id, final
ValidationContextFactory validationContextFactory,
final ControllerServiceProvider
serviceProvider, final String componentType, final String
componentCanonicalClass,
- final ReloadComponent
reloadComponent, final ExtensionManager extensionManager,
+ final ReloadComponent
reloadComponent, final VerifiableComponentFactory verifiableComponentFactory,
final ExtensionManager extensionManager,
final ValidationTrigger
validationTrigger, final boolean isExtensionMissing) {
super(id, validationContextFactory, serviceProvider, componentType,
componentCanonicalClass, reloadComponent, extensionManager, validationTrigger,
isExtensionMissing);
@@ -140,6 +143,7 @@ public class StandardControllerServiceNode extends
AbstractComponentNode impleme
this.active = new AtomicBoolean();
setControllerServiceAndProxy(implementation, proxiedControllerService,
invocationHandler);
stateTransition = new ServiceStateTransition(this);
+ this.verifiableComponentFactory = verifiableComponentFactory;
this.comment = "";
}
@@ -531,17 +535,26 @@ public class StandardControllerServiceNode extends
AbstractComponentNode impleme
// Check if the given configuration requires a different
classloader than the current configuration
final boolean classpathDifferent =
isClasspathDifferent(context.getProperties());
- if (classpathDifferent) {
+ if (classpathDifferent ||
isReloadAdditionalResourcesNecessary()) {
+ LOG.debug("Classpath reload required. Create temporary
InstanceClassLoader for verification");
// Create a classloader for the given configuration and
use that to verify the component's configuration
final Bundle bundle =
extensionManager.getBundle(getBundleCoordinate());
final Set<URL> classpathUrls =
getAdditionalClasspathResources(context.getProperties().keySet(), descriptor ->
context.getProperty(descriptor).getValue());
final ClassLoader currentClassLoader =
Thread.currentThread().getContextClassLoader();
final String classLoaderIsolationKey =
getClassLoaderIsolationKey(context);
- try (final InstanceClassLoader detectedClassLoader =
extensionManager.createInstanceClassLoader(getComponentType(), getIdentifier(),
bundle, classpathUrls, false,
+
+ try (final InstanceClassLoader detectedClassLoader =
extensionManager.createInstanceClassLoader(getComponentClass().getName(),
getIdentifier(), bundle, classpathUrls, false,
classLoaderIsolationKey)) {
Thread.currentThread().setContextClassLoader(detectedClassLoader);
- results.addAll(verifiable.verify(context, logger,
variables));
+ // Create a temp ControllerService for the initial
verification. Use the InstanceClassLoader to instantiate the Temp Controller
Service
+ // This ensures Class.forName(String) classloading
uses the InstanceClassLoader since that classloader will include the updated
additional classpath urls
+ final VerifiableControllerService tempVerifiable =
verifiableComponentFactory.createControllerService(this, detectedClassLoader);
+ try {
+ results.addAll(tempVerifiable.verify(context,
logger, variables));
+ } finally {
+
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class,
tempVerifiable, context);
+ }
} finally {
Thread.currentThread().setContextClassLoader(currentClassLoader);
}
@@ -916,4 +929,5 @@ public class StandardControllerServiceNode extends
AbstractComponentNode impleme
return selectedDelay;
}
+
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/validation/VerifiableComponentFactory.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/validation/VerifiableComponentFactory.java
new file mode 100644
index 0000000000..2dbbb1f059
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/validation/VerifiableComponentFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.components.validation;
+
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.VerifiableControllerService;
+import
org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
+import org.apache.nifi.controller.exception.ProcessorInstantiationException;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.processor.VerifiableProcessor;
+
+public interface VerifiableComponentFactory {
+
+ /**
+ * Returns an instance with a new <code>ClassLoader</code> based on the
existing <code>ProcessorNode</code>
+ *
+ * @param processorNode the ProcessorNode the new instance is based on
+ * @param classLoader the new classloader
+ * @return the verifiable processor created with new ClassLoader
+ * @throws ProcessorInstantiationException if unable to create the instance
+ */
+ VerifiableProcessor createProcessor(ProcessorNode processorNode,
ClassLoader classLoader) throws ProcessorInstantiationException;
+
+ /**
+ * Returns an instance with a new <code>ClassLoader</code> based on the
existing <code>ControllerServiceNode</code>
+ *
+ * @param serviceNode the ControllerServiceNode the new instance is based
on
+ * @param classLoader the new classloader
+ * @return the verifiable controller service created with the new
ClassLoader
+ * @throws ControllerServiceInstantiationException if unable to create
the instance
+ */
+ VerifiableControllerService createControllerService(ControllerServiceNode
serviceNode, ClassLoader classLoader);
+
+}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
index 207b563830..44f2bbf85a 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
@@ -770,11 +770,13 @@ public abstract class AbstractComponentNode implements
ComponentNode {
public synchronized boolean isReloadAdditionalResourcesNecessary() {
// Components that don't have any PropertyDescriptors marked
`dynamicallyModifiesClasspath`
// won't have the fingerprint i.e. will be null, in such cases do
nothing
- if (additionalResourcesFingerprint == null) {
+ final Set<PropertyDescriptor> descriptors =
this.getProperties().keySet();
+ final boolean dynamicallyModifiesClasspath = descriptors.stream()
+ .anyMatch(PropertyDescriptor::isDynamicClasspathModifier);
+ if (!dynamicallyModifiesClasspath) {
return false;
}
- final Set<PropertyDescriptor> descriptors =
this.getProperties().keySet();
final Set<URL> additionalUrls =
this.getAdditionalClasspathResources(descriptors);
final String newFingerprint =
ClassLoaderUtils.generateAdditionalUrlsFingerprint(additionalUrls,
determineClasloaderIsolationKey());
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/validation/StandardVerifiableComponentFactory.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/validation/StandardVerifiableComponentFactory.java
new file mode 100644
index 0000000000..4d8e0e2cd8
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/validation/StandardVerifiableComponentFactory.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.components.validation;
+
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.VerifiableControllerService;
+import
org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
+import org.apache.nifi.controller.exception.ProcessorInstantiationException;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import
org.apache.nifi.controller.service.StandardControllerServiceInitializationContext;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.StandardProcessorInitializationContext;
+import org.apache.nifi.processor.VerifiableProcessor;
+import org.apache.nifi.util.NiFiProperties;
+
+public class StandardVerifiableComponentFactory implements
VerifiableComponentFactory {
+
+ private final FlowController flowController;
+ private final NiFiProperties nifiProperties;
+ public StandardVerifiableComponentFactory(final FlowController
flowController, final NiFiProperties nifiProperties) {
+ this.flowController = flowController;
+ this.nifiProperties = nifiProperties;
+ }
+
+ @Override
+ public VerifiableProcessor createProcessor(final ProcessorNode
processorNode, final ClassLoader classLoader) throws
ProcessorInstantiationException {
+ final VerifiableProcessor verifiableProcessor;
+ final String identifier = processorNode.getIdentifier();
+ final String processorClassName =
processorNode.getProcessor().getClass().getName();
+ try {
+ final Class<?> rawProcessorClass =
Class.forName(processorClassName, true, classLoader);
+ final Class<? extends VerifiableProcessor> processorClass =
rawProcessorClass.asSubclass(VerifiableProcessor.class);
+ verifiableProcessor =
processorClass.getDeclaredConstructor().newInstance();
+
+ final ProcessorInitializationContext tempInitializationContext =
new StandardProcessorInitializationContext(identifier,
processorNode.getLogger(),
+ flowController.getControllerServiceProvider(),
flowController, flowController.createKerberosConfig(nifiProperties));
+ if (verifiableProcessor instanceof Processor processor) {
+ processor.initialize(tempInitializationContext);
+ }
+ } catch (Exception e) {
+ throw new ProcessorInstantiationException("Failed to instantiate
Verifiable Processor Class [%s]".formatted(processorClassName), e);
+ }
+ return verifiableProcessor;
+ }
+
+ @Override
+ public VerifiableControllerService createControllerService(final
ControllerServiceNode serviceNode, final ClassLoader classLoader) {
+ final VerifiableControllerService verifiableControllerService;
+ final String identifier = serviceNode.getIdentifier();
+ final String controllerServiceClassName =
serviceNode.getCanonicalClassName();
+ try {
+ final Class<?> rawControllorServiceClass =
Class.forName(controllerServiceClassName, true, classLoader);
+ final Class<? extends VerifiableControllerService>
controllerServiceClass =
rawControllorServiceClass.asSubclass(VerifiableControllerService.class);
+ verifiableControllerService =
controllerServiceClass.getDeclaredConstructor().newInstance();
+
+ final ControllerServiceInitializationContext
tempInitializationContext = new
StandardControllerServiceInitializationContext(identifier,
+ serviceNode.getLogger(),
+ flowController.getControllerServiceProvider(),
flowController.getStateManagerProvider().getStateManager(identifier),
+ flowController.createKerberosConfig(nifiProperties),
flowController);
+ if (verifiableControllerService instanceof ControllerService
controllerService) {
+ controllerService.initialize(tempInitializationContext);
+ }
+ } catch (Exception e) {
+ throw new ControllerServiceInstantiationException("Failed to
instantiate Verifiable Controller Service Class
[%s]".formatted(controllerServiceClassName), e);
+ }
+ return verifiableControllerService;
+ }
+
+}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
index 1fdf88d326..e5ef2eacda 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
@@ -28,6 +28,7 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.components.validation.ValidationTrigger;
+import org.apache.nifi.components.validation.VerifiableComponentFactory;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import
org.apache.nifi.controller.flowanalysis.FlowAnalysisRuleInstantiationException;
import org.apache.nifi.controller.flowanalysis.FlowAnalysisUtil;
@@ -117,6 +118,7 @@ public class ExtensionBuilder {
private String classloaderIsolationKey;
private SSLContext systemSslContext;
private PythonBridge pythonBridge;
+ private VerifiableComponentFactory verifiableComponentFactory;
public ExtensionBuilder type(final String type) {
this.type = type;
@@ -221,6 +223,11 @@ public class ExtensionBuilder {
return this;
}
+ public ExtensionBuilder verifiableComponentFactory(final
VerifiableComponentFactory verifiableComponentFactory) {
+ this.verifiableComponentFactory = verifiableComponentFactory;
+ return this;
+ }
+
public ProcessorNode buildProcessor() {
if (identifier == null) {
throw new IllegalStateException("Processor ID must be specified");
@@ -243,6 +250,9 @@ public class ExtensionBuilder {
if (reloadComponent == null) {
throw new IllegalStateException("Reload Component must be
specified");
}
+ if (verifiableComponentFactory == null) {
+ throw new IllegalStateException("Verifiable Component Factory must
be specified");
+ }
boolean creationSuccessful = true;
final StandardLoggingContext loggingContext = new
StandardLoggingContext();
@@ -434,6 +444,9 @@ public class ExtensionBuilder {
if (reloadComponent == null) {
throw new IllegalStateException("Reload Component must be
specified");
}
+ if (verifiableComponentFactory == null) {
+ throw new IllegalStateException("Verifiable Component Factory must
be specified");
+ }
if (stateManagerProvider == null) {
throw new IllegalStateException("State Manager Provider must be
specified");
}
@@ -502,7 +515,7 @@ public class ExtensionBuilder {
final ValidationContextFactory validationContextFactory =
createValidationContextFactory(serviceProvider);
final ProcessorNode procNode = new StandardProcessorNode(processor,
identifier, validationContextFactory, processScheduler, serviceProvider,
- componentType, type, reloadComponent, extensionManager,
validationTrigger, extensionMissing);
+ componentType, type, reloadComponent,
verifiableComponentFactory, extensionManager, validationTrigger,
extensionMissing);
applyDefaultSettings(procNode);
applyDefaultRunDuration(procNode);
@@ -683,7 +696,7 @@ public class ExtensionBuilder {
final ValidationContextFactory validationContextFactory =
createValidationContextFactory(serviceProvider);
final ControllerServiceNode serviceNode = new
StandardControllerServiceNode(originalLoggableComponent,
proxiedLoggableComponent, invocationHandler,
- identifier, validationContextFactory, serviceProvider,
reloadComponent, extensionManager, validationTrigger);
+ identifier, validationContextFactory, serviceProvider,
reloadComponent, verifiableComponentFactory, extensionManager,
validationTrigger);
serviceNode.setName(rawClass.getSimpleName());
// Set Controller Service Node in Logging Context to populate
Process Group information
loggingContext.setComponent(serviceNode);
@@ -770,7 +783,8 @@ public class ExtensionBuilder {
final ValidationContextFactory validationContextFactory =
createValidationContextFactory(serviceProvider);
return new StandardControllerServiceNode(proxiedLoggableComponent,
proxiedLoggableComponent, invocationHandler, identifier,
- validationContextFactory, serviceProvider, componentType,
type, reloadComponent, extensionManager, validationTrigger, true);
+ validationContextFactory, serviceProvider, componentType,
type, reloadComponent, verifiableComponentFactory,
+ extensionManager, validationTrigger, true);
}
private LoggableComponent<Processor> createLoggableProcessor(final
LoggingContext loggingContext) throws ProcessorInstantiationException {
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 7be88283cb..23f11db5fb 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -52,9 +52,11 @@ import
org.apache.nifi.components.monitor.LongRunningTaskMonitor;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.components.state.StateProvider;
import org.apache.nifi.components.validation.StandardValidationTrigger;
+import
org.apache.nifi.components.validation.StandardVerifiableComponentFactory;
import org.apache.nifi.components.validation.TriggerValidationTask;
import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.components.validation.ValidationTrigger;
+import org.apache.nifi.components.validation.VerifiableComponentFactory;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
@@ -325,6 +327,7 @@ public class FlowController implements
ReportingTaskProvider, FlowAnalysisRulePr
private final FlowEngine flowAnalysisThreadPool;
private final ValidationTrigger validationTrigger;
private final ReloadComponent reloadComponent;
+ private final VerifiableComponentFactory verifiableComponentFactory;
private final ProvenanceAuthorizableFactory provenanceAuthorizableFactory;
private final UserAwareEventAccess eventAccess;
private final ParameterContextManager parameterContextManager;
@@ -640,6 +643,7 @@ public class FlowController implements
ReportingTaskProvider, FlowAnalysisRulePr
this.snippetManager = new SnippetManager();
this.reloadComponent = new StandardReloadComponent(this);
+ this.verifiableComponentFactory = new
StandardVerifiableComponentFactory(this, this.nifiProperties);
final ProcessGroup rootGroup =
flowManager.createProcessGroup(ComponentIdGenerator.generateId().toString());
rootGroup.setName(FlowManager.DEFAULT_ROOT_GROUP_NAME);
@@ -2087,6 +2091,10 @@ public class FlowController implements
ReportingTaskProvider, FlowAnalysisRulePr
return reloadComponent;
}
+ public VerifiableComponentFactory getVerifiableComponentFactory() {
+ return verifiableComponentFactory;
+ }
+
public void startProcessor(final String parentGroupId, final String
processorId) {
startProcessor(parentGroupId, processorId, true);
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
index c98e84d076..fe07fe94d8 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
@@ -358,6 +358,7 @@ public class StandardFlowManager extends
AbstractFlowManager implements FlowMana
.nodeTypeProvider(flowController)
.validationTrigger(flowController.getValidationTrigger())
.reloadComponent(flowController.getReloadComponent())
+
.verifiableComponentFactory(flowController.getVerifiableComponentFactory())
.addClasspathUrls(additionalUrls)
.kerberosConfig(flowController.createKerberosConfig(nifiProperties))
.stateManagerProvider(flowController.getStateManagerProvider())
@@ -735,6 +736,7 @@ public class StandardFlowManager extends
AbstractFlowManager implements FlowMana
.nodeTypeProvider(flowController)
.validationTrigger(flowController.getValidationTrigger())
.reloadComponent(flowController.getReloadComponent())
+
.verifiableComponentFactory(flowController.getVerifiableComponentFactory())
.addClasspathUrls(additionalUrls)
.kerberosConfig(flowController.createKerberosConfig(nifiProperties))
.stateManagerProvider(flowController.getStateManagerProvider())
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
index 3b154d1a59..83f1fb3f68 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
@@ -26,6 +26,7 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.components.validation.ValidationTrigger;
+import org.apache.nifi.components.validation.VerifiableComponentFactory;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ExtensionBuilder;
@@ -194,6 +195,7 @@ public class TestStandardProcessScheduler {
.nodeTypeProvider(Mockito.mock(NodeTypeProvider.class))
.validationTrigger(Mockito.mock(ValidationTrigger.class))
.reloadComponent(Mockito.mock(ReloadComponent.class))
+
.verifiableComponentFactory(Mockito.mock(VerifiableComponentFactory.class))
.stateManagerProvider(Mockito.mock(StateManagerProvider.class))
.extensionManager(extensionManager)
.buildControllerService();
@@ -245,6 +247,7 @@ public class TestStandardProcessScheduler {
proc.initialize(new StandardProcessorInitializationContext(uuid, null,
null, null, KerberosConfig.NOT_CONFIGURED));
final ReloadComponent reloadComponent =
Mockito.mock(ReloadComponent.class);
+ final VerifiableComponentFactory verifiableComponentFactory =
Mockito.mock(VerifiableComponentFactory.class);
final ControllerServiceNode service =
flowManager.createControllerService(NoStartServiceImpl.class.getName(),
"service",
systemBundle.getBundleDetails().getCoordinate(), null, true,
true, null);
@@ -254,7 +257,7 @@ public class TestStandardProcessScheduler {
final LoggableComponent<Processor> loggableComponent = new
LoggableComponent<>(proc, systemBundle.getBundleDetails().getCoordinate(),
null);
final ValidationContextFactory validationContextFactory = new
StandardValidationContextFactory(serviceProvider);
final ProcessorNode procNode = new
StandardProcessorNode(loggableComponent, uuid, validationContextFactory,
scheduler,
- serviceProvider, reloadComponent, extensionManager, new
SynchronousValidationTrigger());
+ serviceProvider, reloadComponent, verifiableComponentFactory,
extensionManager, new SynchronousValidationTrigger());
rootGroup.addProcessor(procNode);
@@ -497,10 +500,12 @@ public class TestStandardProcessScheduler {
proc.initialize(new
StandardProcessorInitializationContext(UUID.randomUUID().toString(), null,
null, null, KerberosConfig.NOT_CONFIGURED));
final ReloadComponent reloadComponent =
Mockito.mock(ReloadComponent.class);
+ final VerifiableComponentFactory verifiableComponentFactory =
Mockito.mock(VerifiableComponentFactory.class);
final LoggableComponent<Processor> loggableComponent = new
LoggableComponent<>(proc, systemBundle.getBundleDetails().getCoordinate(),
null);
final ProcessorNode procNode = new
StandardProcessorNode(loggableComponent, UUID.randomUUID().toString(),
- new StandardValidationContextFactory(serviceProvider), scheduler,
serviceProvider, reloadComponent, extensionManager, new
SynchronousValidationTrigger());
+ new StandardValidationContextFactory(serviceProvider), scheduler,
serviceProvider, reloadComponent,
+ verifiableComponentFactory, extensionManager, new
SynchronousValidationTrigger());
procNode.performValidation();
rootGroup.addProcessor(procNode);
@@ -523,11 +528,12 @@ public class TestStandardProcessScheduler {
proc.initialize(new
StandardProcessorInitializationContext(UUID.randomUUID().toString(), null,
null, null, KerberosConfig.NOT_CONFIGURED));
final ReloadComponent reloadComponent =
Mockito.mock(ReloadComponent.class);
+ final VerifiableComponentFactory verifiableComponentFactory =
Mockito.mock(VerifiableComponentFactory.class);
final LoggableComponent<Processor> loggableComponent = new
LoggableComponent<>(proc, systemBundle.getBundleDetails().getCoordinate(),
null);
final ProcessorNode procNode = new
StandardProcessorNode(loggableComponent, UUID.randomUUID().toString(),
new StandardValidationContextFactory(serviceProvider),
- scheduler, serviceProvider, reloadComponent, extensionManager, new
SynchronousValidationTrigger());
+ scheduler, serviceProvider, reloadComponent,
verifiableComponentFactory, extensionManager, new
SynchronousValidationTrigger());
rootGroup.addProcessor(procNode);
@@ -553,10 +559,12 @@ public class TestStandardProcessScheduler {
proc.initialize(new
StandardProcessorInitializationContext(UUID.randomUUID().toString(), null,
null, null, KerberosConfig.NOT_CONFIGURED));
final ReloadComponent reloadComponent =
Mockito.mock(ReloadComponent.class);
+ final VerifiableComponentFactory verifiableComponentFactory =
Mockito.mock(VerifiableComponentFactory.class);
final LoggableComponent<Processor> loggableComponent = new
LoggableComponent<>(proc, systemBundle.getBundleDetails().getCoordinate(),
null);
final ProcessorNode procNode = new
StandardProcessorNode(loggableComponent, UUID.randomUUID().toString(),
- new StandardValidationContextFactory(serviceProvider), scheduler,
serviceProvider, reloadComponent, extensionManager, new
SynchronousValidationTrigger());
+ new StandardValidationContextFactory(serviceProvider), scheduler,
serviceProvider, reloadComponent,
+ verifiableComponentFactory, extensionManager, new
SynchronousValidationTrigger());
rootGroup.addProcessor(procNode);
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java
index 84d5a27f22..a3af19cc35 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java
@@ -20,6 +20,7 @@ import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.components.validation.ValidationTrigger;
+import org.apache.nifi.components.validation.VerifiableComponentFactory;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ExtensionBuilder;
import org.apache.nifi.controller.NodeTypeProvider;
@@ -226,6 +227,7 @@ class StandardControllerServiceProviderTest {
.nodeTypeProvider(mock(NodeTypeProvider.class))
.validationTrigger(mock(ValidationTrigger.class))
.reloadComponent(mock(ReloadComponent.class))
+
.verifiableComponentFactory(mock(VerifiableComponentFactory.class))
.stateManagerProvider(mock(StateManagerProvider.class))
.extensionManager(extensionManager)
.buildControllerService();
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
index 05dcaafa27..574609e237 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
@@ -26,6 +26,7 @@ import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.components.validation.ValidationTrigger;
+import org.apache.nifi.components.validation.VerifiableComponentFactory;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ExtensionBuilder;
import org.apache.nifi.controller.FlowController;
@@ -183,6 +184,7 @@ public class TestStandardControllerServiceProvider {
.nodeTypeProvider(Mockito.mock(NodeTypeProvider.class))
.validationTrigger(Mockito.mock(ValidationTrigger.class))
.reloadComponent(Mockito.mock(ReloadComponent.class))
+
.verifiableComponentFactory(Mockito.mock(VerifiableComponentFactory.class))
.stateManagerProvider(Mockito.mock(StateManagerProvider.class))
.extensionManager(extensionManager)
.buildControllerService();
@@ -397,6 +399,7 @@ public class TestStandardControllerServiceProvider {
private ProcessorNode createProcessor(final StandardProcessScheduler
scheduler, final ControllerServiceProvider serviceProvider) {
final ReloadComponent reloadComponent =
Mockito.mock(ReloadComponent.class);
+ final VerifiableComponentFactory verifiableComponentFactory =
Mockito.mock(VerifiableComponentFactory.class);
final Processor processor = new DummyProcessor();
final MockProcessContext context = new MockProcessContext(processor,
Mockito.mock(StateManager.class));
@@ -406,7 +409,7 @@ public class TestStandardControllerServiceProvider {
final LoggableComponent<Processor> dummyProcessor = new
LoggableComponent<>(processor, systemBundle.getBundleDetails().getCoordinate(),
null);
final ProcessorNode procNode = new
StandardProcessorNode(dummyProcessor, mockInitContext.getIdentifier(),
new StandardValidationContextFactory(serviceProvider),
scheduler, serviceProvider,
- reloadComponent, extensionManager, new
SynchronousValidationTrigger());
+ reloadComponent, verifiableComponentFactory, extensionManager,
new SynchronousValidationTrigger());
final FlowManager flowManager = Mockito.mock(FlowManager.class);
final FlowController flowController =
Mockito.mock(FlowController.class);
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/ComponentBuilder.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/ComponentBuilder.java
index dac80d1bd7..bce59f06ee 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/ComponentBuilder.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/ComponentBuilder.java
@@ -24,6 +24,7 @@ import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.components.validation.ValidationTrigger;
+import org.apache.nifi.components.validation.VerifiableComponentFactory;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.controller.LoggableComponent;
@@ -135,12 +136,13 @@ public class ComponentBuilder {
final ProcessScheduler processScheduler =
statelessEngine.getProcessScheduler();
final ControllerServiceProvider controllerServiceProvider =
statelessEngine.getControllerServiceProvider();
final ReloadComponent reloadComponent =
statelessEngine.getReloadComponent();
+ final VerifiableComponentFactory verifiableComponentFactory =
statelessEngine.getVerifiableComponentFactory();
final ExtensionManager extensionManager =
statelessEngine.getExtensionManager();
final ValidationTrigger validationTrigger =
statelessEngine.getValidationTrigger();
final ValidationContextFactory validationContextFactory = new
StandardValidationContextFactory(controllerServiceProvider);
final ProcessorNode procNode = new
StandardProcessorNode(loggableProcessor, identifier, validationContextFactory,
processScheduler, controllerServiceProvider,
- reloadComponent, extensionManager, validationTrigger);
+ reloadComponent, verifiableComponentFactory, extensionManager,
validationTrigger);
loggingContext.setComponent(procNode);
logger.info("Created Processor of type {} with identifier {}", type,
identifier);
@@ -247,6 +249,7 @@ public class ComponentBuilder {
final ControllerServiceProvider serviceProvider =
statelessEngine.getControllerServiceProvider();
final KerberosConfig kerberosConfig =
statelessEngine.getKerberosConfig();
final ReloadComponent reloadComponent =
statelessEngine.getReloadComponent();
+ final VerifiableComponentFactory verifiableComponentFactory =
statelessEngine.getVerifiableComponentFactory();
final ValidationTrigger validationTrigger =
statelessEngine.getValidationTrigger();
final NodeTypeProvider nodeTypeProvider = new
StatelessNodeTypeProvider();
@@ -298,7 +301,7 @@ public class ComponentBuilder {
final ValidationContextFactory validationContextFactory = new
StandardValidationContextFactory(serviceProvider);
final ControllerServiceNode serviceNode = new
StandardControllerServiceNode(originalLoggableComponent,
proxiedLoggableComponent, invocationHandler,
- identifier, validationContextFactory, serviceProvider,
reloadComponent, extensionManager, validationTrigger);
+ identifier, validationContextFactory, serviceProvider,
reloadComponent, verifiableComponentFactory, extensionManager,
validationTrigger);
serviceNode.setName(rawClass.getSimpleName());
loggingContext.setComponent(serviceNode);
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
index 80d45d9ef2..29b94d45bf 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
@@ -31,6 +31,7 @@ import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.components.state.StatelessStateManagerProvider;
import org.apache.nifi.components.validation.StandardValidationTrigger;
import org.apache.nifi.components.validation.ValidationTrigger;
+import org.apache.nifi.components.validation.VerifiableComponentFactory;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.PropertyConfiguration;
import org.apache.nifi.controller.ReloadComponent;
@@ -124,6 +125,7 @@ public class StandardStatelessEngine implements
StatelessEngine {
// Member Variables created/managed internally
private final ReloadComponent reloadComponent;
+ private final VerifiableComponentFactory verifiableComponentFactory;
private final ValidationTrigger validationTrigger;
// Member Variables injected via initialization. Effectively final.
@@ -150,6 +152,7 @@ public class StandardStatelessEngine implements
StatelessEngine {
this.componentEnableTimeout =
parseDuration(builder.componentEnableTimeout);
this.reloadComponent = new StatelessReloadComponent(this);
+ this.verifiableComponentFactory = new
StatelessVerifiableComponentFactory(stateManagerProvider,
controllerServiceProvider, kerberosConfig);
this.validationTrigger = new StandardValidationTrigger(new
FlowEngine(1, "Component Validation", true), () -> true);
}
@@ -786,4 +789,9 @@ public class StandardStatelessEngine implements
StatelessEngine {
return DEFAULT_STATUS_TASK_PERIOD;
}
}
+
+ @Override
+ public VerifiableComponentFactory getVerifiableComponentFactory() {
+ return verifiableComponentFactory;
+ }
}
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessEngine.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessEngine.java
index 54bd53c625..16ba936c46 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessEngine.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessEngine.java
@@ -20,6 +20,7 @@ package org.apache.nifi.stateless.engine;
import org.apache.nifi.asset.AssetManager;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.components.validation.ValidationTrigger;
+import org.apache.nifi.components.validation.VerifiableComponentFactory;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ReloadComponent;
import org.apache.nifi.controller.flow.FlowManager;
@@ -74,4 +75,6 @@ public interface StatelessEngine {
Duration getStatusTaskInterval();
AssetManager getAssetManager();
+
+ VerifiableComponentFactory getVerifiableComponentFactory();
}
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessVerifiableComponentFactory.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessVerifiableComponentFactory.java
new file mode 100644
index 0000000000..0f302bbe6d
--- /dev/null
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessVerifiableComponentFactory.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stateless.engine;
+
+import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.validation.VerifiableComponentFactory;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.VerifiableControllerService;
+import
org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
+import org.apache.nifi.controller.exception.ProcessorInstantiationException;
+import org.apache.nifi.controller.kerberos.KerberosConfig;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import
org.apache.nifi.controller.service.StandardControllerServiceInitializationContext;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.StandardProcessorInitializationContext;
+import org.apache.nifi.processor.VerifiableProcessor;
+
+public class StatelessVerifiableComponentFactory implements
VerifiableComponentFactory {
+
+ private final StateManagerProvider stateManagerProvider;
+ private final ControllerServiceProvider controllerServiceProvider;
+ private final KerberosConfig kerberosConfig;
+
+ public StatelessVerifiableComponentFactory(final StateManagerProvider
stateManagerProvider, final ControllerServiceProvider
controllerServiceProvider, final KerberosConfig kerberosConfig) {
+ this.stateManagerProvider = stateManagerProvider;
+ this.controllerServiceProvider = controllerServiceProvider;
+ this.kerberosConfig = kerberosConfig;
+ }
+
+ @Override
+ public VerifiableProcessor createProcessor(final ProcessorNode
processorNode, final ClassLoader classLoader) throws
ProcessorInstantiationException {
+ final VerifiableProcessor verifiableProcessor;
+ final String identifier = processorNode.getIdentifier();
+ final String processorClassName =
processorNode.getProcessor().getClass().getName();
+ try {
+ final Class<?> rawProcessorClass =
Class.forName(processorClassName, true, classLoader);
+ final Class<? extends VerifiableProcessor> processorClass =
rawProcessorClass.asSubclass(VerifiableProcessor.class);
+ verifiableProcessor =
processorClass.getDeclaredConstructor().newInstance();
+
+ final ProcessorInitializationContext tempInitializationContext =
new StandardProcessorInitializationContext(identifier,
processorNode.getLogger(),
+ controllerServiceProvider, new
StatelessNodeTypeProvider(), kerberosConfig);
+ if (verifiableProcessor instanceof Processor processor) {
+ processor.initialize(tempInitializationContext);
+ }
+ } catch (Exception e) {
+ throw new ProcessorInstantiationException("Failed to instantiate
Verifiable Processor Class [%s]".formatted(processorClassName), e);
+ }
+ return verifiableProcessor;
+ }
+
+ @Override
+ public VerifiableControllerService createControllerService(final
ControllerServiceNode serviceNode, final ClassLoader classLoader) {
+ final VerifiableControllerService verifiableControllerService;
+ final String identifier = serviceNode.getIdentifier();
+ final String controllerServiceClassName =
serviceNode.getCanonicalClassName();
+ try {
+ final Class<?> rawControllorServiceClass =
Class.forName(controllerServiceClassName, true, classLoader);
+ final Class<? extends VerifiableControllerService>
controllerServiceClass =
rawControllorServiceClass.asSubclass(VerifiableControllerService.class);
+ verifiableControllerService =
controllerServiceClass.getDeclaredConstructor().newInstance();
+
+ final ControllerServiceInitializationContext
tempInitializationContext = new
StandardControllerServiceInitializationContext(identifier,
+ serviceNode.getLogger(),
+ controllerServiceProvider,
stateManagerProvider.getStateManager(identifier),
+ kerberosConfig, new StatelessNodeTypeProvider());
+ if (verifiableControllerService instanceof ControllerService
controllerService) {
+ controllerService.initialize(tempInitializationContext);
+ }
+ } catch (Exception e) {
+ throw new ControllerServiceInstantiationException("Failed to
instantiate Verifiable Controller Service Class
[%s]".formatted(controllerServiceClassName), e);
+ }
+ return verifiableControllerService;
+ }
+
+}