This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 6adfb6131d NIFI-11310: Provide appropriate classpath resources to the
ReloadComponent when a processor is terminated
6adfb6131d is described below
commit 6adfb6131db831c9fa33bf1292b7b225482b0035
Author: Mark Payne <[email protected]>
AuthorDate: Mon Mar 20 15:42:34 2023 -0400
NIFI-11310: Provide appropriate classpath resources to the ReloadComponent
when a processor is terminated
NIFI-11310: Fixed META-INF/services file that was mistakenly listing an
extra extension point, due to rebase
Signed-off-by: Matthew Burgess <[email protected]>
This closes #7061
---
.../scheduling/StandardProcessScheduler.java | 6 +-
.../tests/system/DynamicallyModifyClasspath.java | 125 ++++++++++++++
.../services/org.apache.nifi.processor.Processor | 1 +
.../processor/DynamicClassPathModificationIT.java | 191 +++++++++++++++++++++
.../cli/impl/client/nifi/ProcessorClient.java | 2 +
.../client/nifi/impl/JerseyProcessorClient.java | 14 ++
6 files changed, 337 insertions(+), 2 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 14019dca2d..87a5ac558f 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -56,8 +56,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
-import java.util.Collections;
+import java.net.URL;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -440,7 +441,8 @@ public final class StandardProcessScheduler implements
ProcessScheduler {
getSchedulingAgent(procNode).incrementMaxThreadCount(tasksTerminated);
try {
- flowController.getReloadComponent().reload(procNode,
procNode.getProcessor().getClass().getName(), procNode.getBundleCoordinate(),
Collections.emptySet());
+ final Set<URL> additionalUrls =
procNode.getAdditionalClasspathResources(procNode.getPropertyDescriptors());
+ flowController.getReloadComponent().reload(procNode,
procNode.getProcessor().getClass().getName(), procNode.getBundleCoordinate(),
additionalUrls);
} catch (final ProcessorInstantiationException e) {
// This shouldn't happen because we already have been able to
instantiate the processor before
LOG.error("Failed to replace instance of Processor for {} when
terminating Processor", procNode);
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/DynamicallyModifyClasspath.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/DynamicallyModifyClasspath.java
new file mode 100644
index 0000000000..3e77c010d4
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/DynamicallyModifyClasspath.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.resource.ResourceCardinality;
+import org.apache.nifi.components.resource.ResourceType;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.BufferedWriter;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@RequiresInstanceClassLoading
+public class DynamicallyModifyClasspath extends AbstractProcessor {
+
+ static final PropertyDescriptor URLS = new PropertyDescriptor.Builder()
+ .name("URLs to Load")
+ .description("URLs to load onto the classpath")
+ .required(false)
+ .dynamicallyModifiesClasspath(true)
+ .identifiesExternalResource(ResourceCardinality.MULTIPLE,
ResourceType.URL, ResourceType.FILE, ResourceType.DIRECTORY)
+ .build();
+
+ static final PropertyDescriptor CLASS_TO_LOAD = new
PropertyDescriptor.Builder()
+ .name("Class to Load")
+ .description("The name of the Class to load")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor SLEEP_DURATION = new
PropertyDescriptor.Builder()
+ .name("Sleep Duration")
+ .description("Amount of time to sleep in the onTrigger method")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .defaultValue("0 sec")
+ .build();
+
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("FlowFiles are routed to this relationship if the
specified class can be loaded")
+ .build();
+ static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("FlowFiles are routed to this relationship if the
specified class cannot be loaded")
+ .build();
+
+
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return Arrays.asList(URLS, CLASS_TO_LOAD, SLEEP_DURATION);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE));
+ }
+
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final long sleepMillis =
context.getProperty(SLEEP_DURATION).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.MILLISECONDS);
+ if (sleepMillis > 0) {
+ try {
+ Thread.sleep(sleepMillis);
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ final String classToLoad =
context.getProperty(CLASS_TO_LOAD).getValue();
+ try {
+ final Class<?> clazz = Class.forName(classToLoad);
+ try (final OutputStream out = session.write(flowFile);
+ final OutputStreamWriter streamWriter = new
OutputStreamWriter(out);
+ final BufferedWriter writer = new
BufferedWriter(streamWriter)) {
+
+ writer.write(clazz.getName());
+ writer.newLine();
+ writer.write(clazz.getClassLoader().toString());
+ }
+
+ session.transfer(flowFile, REL_SUCCESS);
+ } catch (final Exception e) {
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ }
+}
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index b0262a6dce..83e1f6a3fd 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -20,6 +20,7 @@
org.apache.nifi.processors.tests.system.ConcatenateRangeOfFlowFiles
org.apache.nifi.processors.tests.system.DependOnProperties
org.apache.nifi.processors.tests.system.DoNotTransferFlowFile
org.apache.nifi.processors.tests.system.Duplicate
+org.apache.nifi.processors.tests.system.DynamicallyModifyClasspath
org.apache.nifi.processors.tests.system.EnsureProcessorConfigurationCorrect
org.apache.nifi.processors.tests.system.EvaluatePropertiesWithDifferentELScopes
org.apache.nifi.processors.tests.system.FakeProcessor
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/DynamicClassPathModificationIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/DynamicClassPathModificationIT.java
new file mode 100644
index 0000000000..a5561e40bf
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/DynamicClassPathModificationIT.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.processor;
+
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class DynamicClassPathModificationIT extends NiFiSystemIT {
+
+ private ProcessorEntity generate;
+ private ProcessorEntity modify;
+
+ private ConnectionEntity modifyInputConnection;
+ private ConnectionEntity successConnection;
+ private ConnectionEntity failureConnection;
+
+
+ @Test
+ public void testLoadsClassOnBaseClasspath() throws NiFiClientException,
IOException, InterruptedException {
+ createFlow();
+
+ // Configure with a class that is always on the classpath
+ final Map<String, String> propertyMap = new HashMap<>();
+ propertyMap.put("Class to Load", "org.apache.nifi.flowfile.FlowFile");
+ getClientUtil().updateProcessorProperties(modify, propertyMap);
+ getClientUtil().waitForValidProcessor(modify.getId());
+
+ // Let Generate create a FlowFile
+ getClientUtil().startProcessor(generate);
+ waitForQueueCount(modifyInputConnection.getId(), 1);
+
+ // Start the processor and make sure that the FlowFile is routed to
success
+ getClientUtil().startProcessor(modify);
+ waitForQueueCount(successConnection.getId(), 1);
+ assertEquals(0, getConnectionQueueSize(failureConnection.getId()));
+ }
+
+ @Test
+ public void testLoadsClassFromDynamicModification() throws
NiFiClientException, IOException, InterruptedException {
+ createFlow();
+
+ final Map<String, String> propertyMap = new HashMap<>();
+ propertyMap.put("Class to Load",
"org.apache.commons.lang3.StringUtils");
+ getClientUtil().updateProcessorProperties(modify, propertyMap);
+ getClientUtil().waitForValidProcessor(modify.getId());
+
+ // Let Generate create a FlowFile
+ getClientUtil().startProcessor(generate);
+ waitForQueueCount(modifyInputConnection.getId(), 1);
+
+ // Start the processor and we expect the FlowFile to go to failure
because the StringUtils class should not be available
+ getClientUtil().startProcessor(modify);
+ waitForQueueCount(failureConnection.getId(), 1);
+ assertEquals(0, getConnectionQueueSize(successConnection.getId()));
+
+ getClientUtil().stopProcessor(modify);
+ getClientUtil().stopProcessor(generate);
+
+ getClientUtil().waitForStoppedProcessor(modify.getId());
+ getClientUtil().waitForStoppedProcessor(generate.getId());
+
+ // Update modify to have the appropriate URL
+ propertyMap.put("URLs to Load",
getCommonsLangJar().toURI().toURL().toString());
+ getClientUtil().updateProcessorProperties(modify, propertyMap);
+ getClientUtil().waitForValidProcessor(modify.getId());
+
+ // Let Generate create another FlowFile
+ getClientUtil().startProcessor(generate);
+ waitForQueueCount(modifyInputConnection.getId(), 1);
+
+ // Wait for a FlowFile to be routed to success
+ getClientUtil().startProcessor(modify);
+ waitForQueueCount(successConnection.getId(), 1);
+
+ getClientUtil().stopProcessor(generate);
+ getClientUtil().waitForStoppedProcessor(generate.getId());
+
+ // Restart and ensure that everything works as expected after restart
+ getNiFiInstance().stop();
+ getNiFiInstance().start(true);
+
+ // Feed another FlowFile through. Upon restart, in order to modify, we
need to get the most up-to-date revision so will first fetch the Processor
+ final ProcessorEntity generateAfterRestart =
getNifiClient().getProcessorClient().getProcessor(generate.getId());
+ getClientUtil().startProcessor(generateAfterRestart);
+
+ // Depending on whether or not the flow was written out with the
processor running, the Modify processor may or may not be running. Ensure that
it is running.
+ getClientUtil().waitForValidationCompleted(modify);
+ final ProcessorEntity modifyAfterRestart =
getNifiClient().getProcessorClient().getProcessor(modify.getId());
+ final String modifyRunStatus =
modifyAfterRestart.getStatus().getRunStatus();
+ if (!"Running".equalsIgnoreCase(modifyRunStatus)) {
+ getClientUtil().startProcessor(modifyAfterRestart);
+ }
+
+ // We now expect 2 FlowFiles to be in the success route
+ waitForQueueCount(successConnection.getId(), 2);
+ }
+
+
+ @Test
+ public void testSuccessAfterTerminate() throws NiFiClientException,
IOException, InterruptedException {
+ createFlow();
+
+ // Configure so that the processor should succeed but sleep for 5 mins
so that we can terminate it
+ final Map<String, String> propertyMap = new HashMap<>();
+ propertyMap.put("Class to Load",
"org.apache.commons.lang3.StringUtils");
+ propertyMap.put("URLs to Load",
getCommonsLangJar().toURI().toURL().toString());
+ propertyMap.put("Sleep Duration", "${sleep}");
+ getClientUtil().updateProcessorProperties(modify, propertyMap);
+ getClientUtil().waitForValidProcessor(modify.getId());
+
+ // Tell Generate Processor to add an attribute named 'sleep' with 5
mins as the value
+ getClientUtil().updateProcessorProperties(generate,
Collections.singletonMap("sleep", "5 mins"));
+ getClientUtil().waitForValidProcessor(generate.getId());
+
+ // Let Generate create a FlowFile
+ getClientUtil().startProcessor(generate);
+ waitForQueueCount(modifyInputConnection.getId(), 1);
+
+ // Start the processor, wait a bit, stop it and terminate it.
+ getClientUtil().startProcessor(modify);
+ Thread.sleep(2000L);
+ getNifiClient().getProcessorClient().stopProcessor(modify);
+
getNifiClient().getProcessorClient().terminateProcessor(modify.getId());
+ getClientUtil().waitForStoppedProcessor(modify.getId());
+
+ // Empty the queue and generate another FlowFile with a sleep of 0 sec
+ getClientUtil().emptyQueue(modifyInputConnection.getId());
+ getClientUtil().stopProcessor(generate);
+ getClientUtil().updateProcessorProperties(generate,
Collections.singletonMap("sleep", "0 sec"));
+ getClientUtil().waitForValidProcessor(generate.getId());
+ getClientUtil().startProcessor(generate);
+
+ // Start processor and expect data to go to 'success'. This time the
processor will not sleep in onTrigger because
+ // it is configured to sleep only on the first iteration after an
update.
+ getClientUtil().startProcessor(modify);
+ waitForQueueCount(successConnection.getId(), 1);
+ }
+
+ private File getCommonsLangJar() {
+ final File bootstrapLib = new
File(getNiFiInstance().getInstanceDirectory(), "lib/bootstrap");
+ final File[] commonsLangJars = bootstrapLib.listFiles(file ->
file.getName().startsWith("commons-lang"));
+ if (commonsLangJars == null || commonsLangJars.length == 0) {
+ throw new IllegalStateException("Could not find commons-lang jar
in bootstrap lib directory");
+ }
+
+ if (commonsLangJars.length > 1) {
+ throw new IllegalStateException("Found multiple commons-lang jars
in bootstrap lib directory");
+ }
+
+ return commonsLangJars[0];
+ }
+
+ // We have several tests running the same flow but with different
configuration. Since we need to reference the ProcessorEntities and
ConnectionEntities, we have a method
+ // that creates the flow and stores the entities are member variables
+ private void createFlow() throws NiFiClientException, IOException {
+ generate = getClientUtil().createProcessor("GenerateFlowFile");
+ modify = getClientUtil().createProcessor("DynamicallyModifyClasspath");
+ ProcessorEntity terminateSuccess =
getClientUtil().createProcessor("TerminateFlowFile");
+ ProcessorEntity terminateFailure =
getClientUtil().createProcessor("TerminateFlowFile");
+
+ modifyInputConnection = getClientUtil().createConnection(generate,
modify, "success");
+ successConnection = getClientUtil().createConnection(modify,
terminateSuccess, "success");
+ failureConnection = getClientUtil().createConnection(modify,
terminateFailure, "failure");
+ }
+}
diff --git
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessorClient.java
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessorClient.java
index 1b355fbe8c..40a6c6537c 100644
---
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessorClient.java
+++
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessorClient.java
@@ -57,6 +57,8 @@ public interface ProcessorClient {
PropertyDescriptorEntity getPropertyDescriptor(String processorId, String
propertyName, Boolean sensitive) throws NiFiClientException, IOException;
+ ProcessorEntity terminateProcessor(String processorId) throws
NiFiClientException, IOException;
+
/**
* Indicates that mutable requests should indicate that the client has
acknowledged that the node is disconnected.
*/
diff --git
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessorClient.java
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessorClient.java
index 3de97619b7..bcd22ed046 100644
---
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessorClient.java
+++
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessorClient.java
@@ -275,4 +275,18 @@ public class JerseyProcessorClient extends
AbstractJerseyClient implements Proce
return
getRequestBuilder(target).get(PropertyDescriptorEntity.class);
});
}
+
+ @Override
+ public ProcessorEntity terminateProcessor(final String processorId) throws
NiFiClientException, IOException {
+ Objects.requireNonNull(processorId, "Processor ID required");
+
+ return executeAction("Error terminating Processor", () -> {
+ final WebTarget target = processorTarget
+ .path("/threads")
+ .resolveTemplate("id", processorId);
+
+ return getRequestBuilder(target).delete(ProcessorEntity.class);
+ });
+ }
+
}